package org.zjvis.datascience.service;

import cn.hutool.core.util.ObjectUtil;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.github.dozermapper.core.MappingException;
import com.google.api.client.util.Sets;
import com.google.common.base.Joiner;
import com.google.common.collect.Maps;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.compress.utils.Lists;
import org.apache.commons.lang3.SerializationUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Lazy;
import org.springframework.core.env.Environment;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import org.zjvis.datascience.common.algo.BaseAlg;
import org.zjvis.datascience.common.constant.DatabaseConstant;
import org.zjvis.datascience.common.constant.DatasetConstant;
import org.zjvis.datascience.common.constant.SqlTemplate;
import org.zjvis.datascience.common.dto.*;
import org.zjvis.datascience.common.dto.dataset.DatasetJsonInfo;
import org.zjvis.datascience.common.enums.*;
import org.zjvis.datascience.common.etl.BaseETL;
import org.zjvis.datascience.common.etl.Join;
import org.zjvis.datascience.common.etl.PivotTable;
import org.zjvis.datascience.common.etl.Union;
import org.zjvis.datascience.common.exception.*;
import org.zjvis.datascience.common.model.ApiResultCode;
import org.zjvis.datascience.common.model.Table;
import org.zjvis.datascience.common.model.stat.ColumnAction;
import org.zjvis.datascience.common.model.stat.ColumnConstant;
import org.zjvis.datascience.common.sql.SqlHelper;
import org.zjvis.datascience.common.util.*;
import org.zjvis.datascience.common.util.db.JDBCUtil;
import org.zjvis.datascience.common.vo.MLModelVO;
import org.zjvis.datascience.common.vo.TaskSaveVO;
import org.zjvis.datascience.common.vo.TaskVO;
import org.zjvis.datascience.common.vo.column.ColumnQueryVO;
import org.zjvis.datascience.common.widget.dto.WidgetDTO;
import org.zjvis.datascience.service.dag.TaskManager;
import org.zjvis.datascience.service.dataprovider.GPDataProvider;
import org.zjvis.datascience.service.dataset.DatasetService;
import org.zjvis.datascience.service.dataset.ImportDataService;
import org.zjvis.datascience.service.graph.GraphService;
import org.zjvis.datascience.service.mapper.TaskInstanceMapper;
import org.zjvis.datascience.service.mapper.TaskMapper;


import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import java.io.IOException;
import java.io.OutputStream;
import java.net.URLEncoder;
import java.nio.charset.StandardCharsets;
import java.sql.*;
import java.util.*;
import java.util.stream.Collectors;
import static java.util.Arrays.asList;

import static org.zjvis.datascience.common.constant.DatabaseConstant.GREEN_PLUM_DEFAULT_SCHEMA;
import static org.zjvis.datascience.common.constant.IdConstant.QUERY_RESULT_IDX;

/**
 * @description Task  任务节点服务 Service
 * @date 2021-12-29
 */
@Service
@SuppressWarnings("all")
public class TaskService {

    private final static Logger logger = LoggerFactory.getLogger("TaskService");

    protected final static int SAMPLE_NUMBER = 5000;

    //TODO 移到别处
    private static String PIVOT_TABLE_SQL = "select pipeline.sys_func_pivot_table_output_col(%s,%s,%s,%s,%s) res";

    @Lazy
    @Autowired
    private TColumnService tColumnService;

    @Autowired
    private AutoJoinService autoJoinService;

    @Autowired
    private TaskMapper taskMapper;

    @Autowired
    private EngineSelector engineSelector;

    @Autowired
    private GPDataProvider gpDataProvider;

    @Autowired
    private DatasetService datasetService;

    @Autowired
    private WidgetFavouriteService widgetFavouriteService;

    @Autowired
    private WidgetService widgetService;

    @Autowired
    private OperatorTemplateService operatorTemplateService;

    @Autowired
    private TaskInstanceService taskInstanceService;

    @Autowired
    private TaskInstanceMapper taskInstanceMapper;

    @Autowired
    private DashboardService dashboardService;

    @Autowired
    private TaskUtil taskUtil;

    @Autowired
    private TaskManager taskManager;

    @Lazy
    @Autowired
    private GraphService graphService;

    @Autowired
    private DatasetCategoryService datasetCategoryService;

    @Autowired
    private DatasetProjectService datasetProjectService;

    @Autowired
    private MLModelService mlModelService;

    @Autowired
    private JlabService jlabService;

    @Autowired
    private DateService dateService;

    @Autowired
    private Environment environment;

    @Autowired
    private FastTextService fastTextService;

    /**
     * 更新DataJson
     *
     * @param taskDTO
     */
    private void adaptor(TaskDTO taskDTO) {
        if (taskDTO.getType() == TaskTypeEnum.TASK_TYPE_ALGOPY.getVal()) {
            return;
        }
        JSONObject jsonObject = JSONObject.parseObject(taskDTO.getDataJson());
        if (jsonObject == null) {
            return;
        }
        if (jsonObject.containsKey("setParams")) {
            JSONArray jsonArray = jsonObject.getJSONArray("setParams");
            for (int i = 0; i < jsonArray.size(); ++i) {
                JSONObject item = jsonArray.getJSONObject(i);
                if (!item.containsKey("english_name") || !item.containsKey("value")) {
                    continue;
                }
                jsonObject.put(item.getString("english_name"), item.get("value"));
            }
            taskDTO.setDataJson(jsonObject.toJSONString());
            logger.debug(taskDTO.getDataJson());
        }
    }

    public TaskDTO queryById(Long id) { return taskMapper.queryById(id); }

    public List<TaskDTO> queryByIds(List<Long> taskIds) {
        ArrayList<TaskDTO> result = Lists.newArrayList();
        for (Long taskId: taskIds) result.add(queryById(taskId));
        return result;
    }

    public TaskDTO queryByParentId(Long parentId) {
        return taskMapper.queryByParentId(parentId);
    }

    public List<TaskDTO> queryByDatasetId(Long datasetId, Long projectId) {
        Map<String, Long> param = new HashMap<>();
        param.put("datasetId", datasetId);
        param.put("projectId", projectId);
        return taskMapper.queryByDatasetId(param);
    }

    public void initTaskUtil(Long pipelineId) {
        if (!taskUtil.contain(pipelineId)) {
            List<TaskDTO> taskDTOS = this.queryByPipeline(pipelineId);
            taskUtil.addPipeline(pipelineId, taskDTOS);
            taskUtil.addPipelineTask(pipelineId, taskDTOS);
        }
    }

    public void updateTaskName(TaskDTO task, String opType) {
        this.initTaskUtil(task.getPipelineId());
        if (StringUtils.isEmpty(task.getName())) {
            task = this.queryById(task.getId());
        }
        switch (opType) {
            case "add": {
                taskUtil.add(task.getPipelineId(), task);
                break;
            }
            case "delete": {
                taskUtil.delete(task.getPipelineId(), task);
                break;
            }
            case "update": {
                taskUtil.update(task.getPipelineId(), task);
                break;
            }
            case "copy": {
                taskUtil.copy(task.getPipelineId(), task);
                break;
            }
            default:
                break;
        }
    }

    public void syncNodeName(TaskDTO taskDTO) {
        JSONObject data = taskDTO.view().getData();
        if (data.containsKey("output")) {
            JSONArray output = data.getJSONArray("output");
            for (int i = 0; i < output.size(); ++i) {
                JSONObject item = output.getJSONObject(i);
                if (item.containsKey("nodeName")) {
                    item.put("nodeName", taskDTO.getName());
                    output.set(i, item);
                }
            }
            data.put("output", output);
            taskDTO.setDataJson(JSONObject.toJSONString(data));
        }
    }

    public void clearTaskUtil(Long pipelineId) {
        if (pipelineId != null) {
            taskUtil.clear(pipelineId);
        }
    }

    public void updateTaskNode(Long pipelineId, TaskDTO dto, String opType) {
        this.initTaskUtil(pipelineId);
        TaskDTO newDto = this.queryById(dto.getId());
        taskUtil.updateTaskNode(pipelineId, newDto, opType);
    }

    public boolean batchUpdatePositionForConnectLine(Long pipelineId, Long parentId, Long childId) {
        TaskDTO parentTask = this.queryById(parentId);
        TaskDTO childTask = this.queryById(childId);
        JSONObject parentPoi = parentTask.view().getData().getJSONObject("position");
        JSONObject childPoi = childTask.view().getData().getJSONObject("position");
        if (parentPoi.getInteger("col") < childPoi.getInteger("col")) {
            // 父节点在前面，不需要移动;
            return true;
        }
        try {
            this.initTaskUtil(pipelineId);
            List<TaskDTO> lists = taskUtil
                    .getNodeWithPositionModified(pipelineId, parentId, childId);
            for (TaskDTO dto : lists) {
                TaskDTO newDto = new TaskDTO();
                newDto.setId(dto.getId());
                newDto.setDataJson(dto.getDataJson());
                this.update(newDto);
            }
            taskUtil.updateNodeMapForSuccess(pipelineId, lists);
        } catch (Exception e) {
            logger.error("batchUpdatePositionForConnectLine faill, errorMsg={}", e.getMessage());
            return false;
        }
        return true;
    }

    /**
     * @param pipelineId
     * @param task
     * @return
     */
    public boolean updatePositionForJoinOrUnion(Long pipelineId, TaskDTO task) {
        initTaskUtil(pipelineId);
        try {
            taskUtil.updatePositionForUnionOrJoinNode(pipelineId, task);
            wrapTaskDTO(task);
            adaptor(task);
            if (!this.update(task)) {
                return false;
            }
            taskUtil.updateTaskNode(pipelineId, task, "update");
        } catch (Exception e) {
            logger.error(e.getMessage());
            return false;
        }
        return true;
    }

    /**
     * 根据brother节点为task找个合适位置并更新
     *
     * @param pipelineId
     * @param brotherId
     * @param task
     * @return
     */
    public boolean updatePositionDragAddForJoinOrUnion(Long pipelineId, Long brotherId,
                                                       TaskDTO task) {
        initTaskUtil(pipelineId);
        try {
            taskUtil.updatePositionDragAddForJoinOrUnion(pipelineId, brotherId, task);
            wrapTaskDTO(task);
            adaptor(task);
            if (!this.update(task)) {
                return false;
            }
            taskUtil.updateTaskNode(pipelineId, task, "update");
        } catch (Exception e) {
            logger.error(e.getMessage());
            return false;
        }
        return true;
    }

    /**
     * 批量更新当前节点后继节点的位置信息
     *
     * @param pipelineId
     * @param task
     * @param parentId
     * @param referChildId
     */
    public boolean batchUpdatePosition(Long pipelineId, TaskDTO task, String parentId,
                                       String referChildId) {
        this.initTaskUtil(pipelineId);
        task = this.queryById(task.getId());
        try {
            List<TaskDTO> lists = taskUtil
                    .getNodeWithPositionModified(pipelineId, task, parentId, referChildId);
            for (TaskDTO dto : lists) {
                TaskDTO newDto = new TaskDTO();
                newDto.setId(dto.getId());
                newDto.setDataJson(dto.getDataJson());
                this.update(newDto);
            }
            taskUtil.updateNodeMapForSuccess(pipelineId, lists);
        } catch (Exception e) {
            logger.error("batchUpdatePosition error={}", e.getMessage());
            return false;
        }
        return true;
    }

    public JSONArray getTemplateParamList(String fileName) {
        String json = new ToolUtil().readJsonFile(fileName);
        if (json.length() == 0) {
            return null;
        }
        return JSONArray.parseArray(json);
    }

    public TaskDTO load(TaskVO vo) {
        int type = vo.getType();
        JSONObject data = vo.getData();
        if (type == TaskTypeEnum.TASK_TYPE_JLAB.getVal()) {
            String TPL_FILENAME = "template/jlab.json";
            JSONArray setParams = getTemplateParamList(TPL_FILENAME);
            data.put("output", new JSONArray());
            data.put("input", new JSONArray());
            data.put("parentType", new JSONArray());
            data.put("outputCols", new JSONArray());
            data.put("algType", data.getLong("algType"));
            data.put("setParams", setParams);
            vo.setData(data);
        }
        else if (type == TaskTypeEnum.TASK_TYPE_ALGOPY.getVal()) {
            //data.getLong("subType") == SubTypeEnum.FEATURE_ENGINEERING.getVal();
            String TPL_FILENAME = String.format("template/algoPy/%s.json", AlgPyEnum.getEnumByDesc(vo.getName()).getName());
            JSONArray setParams = getTemplateParamList(TPL_FILENAME);
            data.put("output", new JSONArray());
            data.put("input", new JSONArray());
            data.put("parentType", new JSONArray());
            data.put("outputCols", new JSONArray());
            data.put("algType", data.getLong("algType"));
            data.put("setParams", setParams);
            data.put("maxParentNumber", 1);
            vo.setData(data);
        } else if (type == TaskTypeEnum.TASK_TYPE_MODEL.getVal()) {
            //TODO
            if (data.get("modelId") != null) {
                Long modelId = data.getLong("modelId");
                MLModelDTO modelDTO = mlModelService.queryMetricsById(modelId);
                MLModelVO model = modelDTO.view();
                data.put("output", new JSONArray());
                data.put("input", new JSONArray());
                data.put("parentType", new JSONArray());
                data.put("outputCols", new JSONArray());
                data.put("algType", 7);
                data.put("modelName", model.getName());
                data.put("param", model.getParam());
                data.put("modelInfo", model.getData());
                data.put("modelDesc", model.getModelDesc());
                data.put("algorithm", model.getAlgorithm());
                data.put("modelPath", model.getModelPath());
                data.put("gmtCreate", model.getGmtCreate());
                //JSONObject modelJson = JSONObject.parseObject(modelDTO.toString());
                vo.setData(data);
            }
        } else if (type == TaskTypeEnum.TASK_TYPE_ALGO.getVal()) {
            BaseAlg alg = BaseAlg.instance(data);
            alg.initTemplate(data);
            vo.setData(data);
        } else if (type == TaskTypeEnum.TASK_TYPE_ETL.getVal()) {
            BaseETL etl = BaseETL.instance(data);
            etl.initTemplate(data);
            vo.setData(data);
        } else if (type == TaskTypeEnum.TASK_TYPE_DATA.getVal()) {
            Long datasetId = data.getLong("dataset_id");
            DatasetDTO datasetDTO = datasetService.queryById(datasetId);
            String dataJson = datasetDTO.getDataJson();
            JSONObject jsonObject = JSONObject.parseObject(dataJson);
            String table = jsonObject.getString("table");
            if (table.contains(".")) {
                table = table.split("\\.")[1];
            }

            JSONArray columnMessage = jsonObject.getJSONArray("columnMessage");
            JSONObject typesFromJson = new JSONObject();
            Map<String, String> semantics = new HashMap<String, String>();
            if (columnMessage != null) {
                for (Object column : columnMessage) {
                    JSONObject obj = (JSONObject) column;
                    String name = obj.getString("name");
                    String typeFromJson = obj.getString("type");
                    typesFromJson.put(name, typeFromJson);
                    String semantic = obj.getString("semantic");
                    if (semantic != null) {
                        semantics.put(name, semantic);
                    } else {
                        semantics.put(name, "null");
                    }
                }
                semantics.put("_record_id_", "null");
            }

            List<String> tableMeta = new ArrayList<>();
            List<String> columnTypes = new ArrayList<>();
            JSONObject categoryOrder = new JSONObject();
            Map<String, String> metaMap = gpDataProvider
                    .getTableMetaMap(table.replaceAll("\"", ""));
            for (Map.Entry<String, String> entry : metaMap.entrySet()) {
                String name = entry.getKey();
                tableMeta.add(name);
                String typeFromJson = typesFromJson.getString(name);
                if (DataTypeEnum.DATE.getValue().equals(typeFromJson)) {
                    columnTypes.add("DATE");
                } else if (DataTypeEnum.JSON.getValue().equals(typeFromJson)) {
                    columnTypes.add("JSON");
                } else if (DataTypeEnum.ARRAY.getValue().equals(typeFromJson)) {
                    columnTypes.add("ARRAY");
                } else {
                    columnTypes.add(entry.getValue());
                }
            }

            data.put("outputCols", tableMeta);
            data.put("input", new JSONArray());
            JSONObject item = new JSONObject();
            item.put("tableName", jsonObject.getString("table"));
            item.put("tableCols", tableMeta);
            item.put("nodeName", vo.getName());
            item.put("columnTypes", columnTypes);
            item.put("semantic", semantics);
            if (categoryOrder.size() > 0) {
                item.put("categoryOrder", categoryOrder);
            }
            setTotalRow(item, jsonObject, datasetDTO);//设置数据总行数
            JSONArray outputTables = new JSONArray();
            outputTables.add(item);
            data.put("output", outputTables);
            vo.setData(data);
        } else if (type == TaskTypeEnum.TASK_TYPE_CLEAN.getVal()) {
            data.put("input", new JSONArray());
            data.put("output", new JSONArray());
            data.put("subTypeName", SubTypeEnum.DATA_CLEANING.getDesc());
            data.put("maxParentNumber", 1);
            data.put("subType", SubTypeEnum.DATA_CLEANING.getVal());
            data.put("parentType", new JSONArray());
            data.put("algName", "DATA_CLEANING");
        } else if (type == TaskTypeEnum.TASK_TYPE_GRAPH.getVal()) {
            BaseAlg alg = BaseAlg.instance(data);
            alg.initTemplate(data);
            vo.setData(data);
        }
        return vo.toTask();
    }

    /**
     * 在节点中保存总行数
     */
    private void setTotalRow(JSONObject outputItem, JSONObject datasetConfig,
                             DatasetDTO datasetDTO) {
        Long totalRow = datasetConfig.getLong("totalRow");
        if (null == totalRow) {
            Long count = gpDataProvider.getRecordCount(GREEN_PLUM_DEFAULT_SCHEMA +
                    "." + datasetConfig.getString("table"));
            totalRow = count;
            datasetConfig.put("totalRow", count);
            datasetDTO.setDataJson(datasetConfig.toJSONString());
            datasetService.update(datasetDTO);
        }
        outputItem.put("totalRow", totalRow);
    }

    /**
     * 设置task节点output内的totalRow
     *
     * @param dto
     */
    public void setTotalRowForOutput(TaskDTO dto) {
        JSONObject conf = dto.view().getData();
        JSONArray output = conf.getJSONArray("output");
        for (int i = 0; i < output.size(); ++i) {
            JSONObject item = output.getJSONObject(i);
            String nameTemplate = item.getString("tableName");
            String tableName = ToolUtil.alignTableName(this.getFullTableName(nameTemplate), 0L);
            try {
                Long totalRow = gpDataProvider.getRecordCount(tableName);
                item.put("totalRow", totalRow);
                output.set(i, item);
            } catch (Exception e) {
                e.printStackTrace();
                logger.error("setTotalRowForOutput fail, {}", e.getMessage());
            }
        }
        dto.setDataJson(conf.toJSONString());
    }

    private String initSqlFromAlgo(JSONObject conf, List<SqlHelper> sqlHelpers, long timeStamp) {
        BaseAlg alg = BaseAlg.instance(conf);
        if (alg == null) {
            return "";
        }
        String sql = alg.initSql(conf, sqlHelpers, timeStamp, engineSelector.getEngineName());
        return sql;
    }

    private String initSqlFromETL(JSONObject conf, List<SqlHelper> helpers, long timeStamp,
                                  TaskDTO taskDTO) {
        BaseETL etl = BaseETL.instance(conf);
        if (etl == null) {
            return "";
        }
        int type = conf.getIntValue("algType");
        if ((ETLEnum.JOIN.getVal() == type || ETLEnum.UNION.getVal() == type) &&
                (!conf.containsKey("isSample") || conf.getString("isSample").equals("SUCCESS") || conf
                        .getString("isSample").equals("FAIL")) &&
                (conf.containsKey("autoJoin") && conf.getString("autoJoin").equals("yes"))
        ) {
            List<String> parentIds = Arrays.asList(taskDTO.getParentId().split(","));
            conf = autoJoinService.autoJoinRecommend(conf, parentIds, taskDTO.getId());
            conf.put("conditions", new JSONArray());
            conf.put("autoJoin", "no");
        }
        etl.parserConf(conf);
        String going2executeSQL = etl
                .initSql(conf, helpers, timeStamp, engineSelector.getEngineName());
        return going2executeSQL;
    }

    /**
     * TODO AY
     * 拼装ML Model 执行的 sql (命令 )
     * @param conf
     * @param o
     * @param timeStamp
     * @return
     */
    public String initSqlFromModel(JSONObject conf, Long timeStamp, TaskDTO task) {

        return StringUtils.EMPTY;
    }

    public void matchParam(JSONObject conf, TaskDTO task) {
        if (conf.containsKey("autoJoin") && conf.getString("autoJoin").equals("yes")) {
            JSONObject param = conf.getJSONObject("param");
            JSONArray featureCols = param.getJSONArray("feature_col");
            String leftCols = Joiner.on(",").join(featureCols);
            String leftTableName = param.getString("source");
            JSONObject leftSemantics = new JSONObject();
            for (Object col : featureCols) {
                leftSemantics.put((String) col, null);
            }
            String rightCols = "*";
            String rightTableName = conf.getJSONArray("input").getJSONObject(0)
                .getString("tableName");
            if (!rightTableName.contains(".")) {
                rightTableName = "dataset." + rightTableName;
            }
            JSONObject rightSemantics = conf.getJSONArray("input").getJSONObject(0)
                .getJSONObject("semantic");
            JSONArray recommendations = autoJoinService
                .autoJoinRecommend(leftCols, rightCols, leftTableName,
                    rightTableName, leftSemantics, rightSemantics,
                    fastTextService.getAutojoinModel(), -1);
            JSONObject match = new JSONObject();
            for (Object obj : featureCols) {
                String col = (String) obj;
                double bestScore = 0;
                for (Object obj2 : recommendations) {
                    JSONObject recommend = (JSONObject) obj2;
                    if (col.equals(recommend.getString("leftHeaderName"))) {
                        double score = recommend.getDouble("score");
                        if (score > 0.8 && score > bestScore) {
                            bestScore = score;
                            match.put(col, recommend.getString("rightHeaderName"));
                        }
                    }
                }
                if (bestScore == 0) {
                    match.put(col, null);
                }
            }
            conf.put("match", match);
            conf.put("autoJoin", "no");
            task.setDataJson(conf.toJSONString());
            update(task);
        }
    }

    public String initSql(TaskDTO task, long timeStamp) {
        List<SqlHelper> sqlHelpers = new ArrayList<>();
        TaskVO taskVO = task.view();
        JSONArray input = taskVO.getData().getJSONArray("input");
        String tableName = "";
        if (input.size() > 0) {
            for (int i = 0; i < input.size(); ++i) {
                tableName = input.getJSONObject(i).getString("tableName");
                tableName = ToolUtil.alignTableName(tableName, timeStamp);
                SqlHelper sqlHelper = new SqlHelper();
                List<String> tableCols = input.getJSONObject(i).getJSONArray("tableCols")
                        .toJavaList(String.class);
                List<String> tableTypes = input.getJSONObject(i).getJSONArray("columnTypes")
                        .toJavaList(String.class);
                Map<String, Integer> typeMaps = ToolUtil.buildColumnTypes(tableCols, tableTypes);
                sqlHelper.bind(typeMaps);
                sqlHelpers.add(sqlHelper);
            }
        } else {
            return StringUtils.EMPTY;
        }
        JSONObject conf = JSONObject.parseObject(task.getDataJson());
        int type = task.getType();
        if (TaskTypeEnum.TASK_TYPE_ALGO.getVal() == type) {
            String sql = initSqlFromAlgo(conf, sqlHelpers, timeStamp);
            task.setDataJson(JSONObject.toJSONString(conf));
            this.update(task);
            return sql;
        } else if (TaskTypeEnum.TASK_TYPE_ETL.getVal() == type) {
            String sql = initSqlFromETL(conf, sqlHelpers, timeStamp, task);
            task.setDataJson(JSONObject.toJSONString(conf));
            this.update(task);
            return sql;
        }
//        } else if (TaskTypeEnum.TASK_TYPE_MODEL.getVal() == type){
//            //TODO AY
//            return initSqlFromModel(conf, null, timeStamp);
//        }
        return StringUtils.EMPTY;
    }


    /**
     * 连线校验 重复,节点最大父节点个数
     *
     * @param taskDTO
     * @return
     */
    private boolean validateLines(TaskDTO taskDTO, List<ApiResultCode> errorCode) {
        String parentIds = taskDTO.getParentId();
        String[] ids = parentIds.split(",");
        Set<String> sets = new HashSet<>();
        for (String id : ids) {
            if (sets.contains(id)) {
                errorCode.add(ApiResultCode.LINE_DUP_ERROR);
                return false;
            } else {
                sets.add(id);
            }
        }
        Integer type = taskDTO.getType();
        int maxNumber = -1;
        if (type.equals(TaskTypeEnum.TASK_TYPE_ALGO.getVal())) {
            BaseAlg baseAlg = BaseAlg.instance(taskDTO.view().getData());
            maxNumber = baseAlg.getMaxParentNumber();
        } else if (type.equals(TaskTypeEnum.TASK_TYPE_ETL.getVal())) {
            BaseETL baseETL = BaseETL.instance(taskDTO.view().getData());
            maxNumber = baseETL.getMaxParentNumber();
        } else if (type.equals(TaskTypeEnum.TASK_TYPE_DATA.getVal())) {
        }
        if (maxNumber == -1) {
            return true;
        }
        boolean status = (ids.length <= maxNumber);
        if (!status) {
            if (maxNumber == 1) {
                errorCode.add(ApiResultCode.ALLOW_ONE_PARENT);
            } else if (maxNumber == 2) {
                errorCode.add(ApiResultCode.ALLOW_TWO_PARENT);
            } else if (maxNumber == 3) {
                errorCode.add(ApiResultCode.ALLOW_THREE_PARENT);
            } else {
                errorCode.add(ApiResultCode.ALLOW_PARENT_ERROR);
            }
        }
        return status;
    }

    /**
     * childId或者parentId, 添加
     *
     * @param ids
     * @param id
     */
    public String appendIds(String ids, String id) {
        Set<String> set = new HashSet<>();
        if (StringUtils.isNotEmpty(ids)) {
            set = new HashSet<>(Arrays.asList(ids.split(",")));
        }
        if (!set.contains(id)) {
            set.add(id);
        }
        return Joiner.on(",").join(set);
    }

    public String removeIds(String ids, String id) {
        Set<String> set = new HashSet<>();
        if (StringUtils.isNotEmpty(ids)) {
            set = new HashSet<>(Arrays.asList(ids.split(",")));
        }
        if (set.contains(id)) {
            set.remove(id);
        }
        return Joiner.on(",").join(set);
    }

    /**
     * 修改当前节点task的父节点的child关系
     *
     * @param task      当前节点
     * @param errorCode
     */
    public void modifyChildRelationForParent(TaskDTO task, List<ApiResultCode> errorCode) {
        String parentId = task.getParentId();
        String currentId = task.getId().toString();
        if (StringUtils.isNotEmpty(parentId)) {
            String[] ids = parentId.split(",");
            for (String id : ids) {
                TaskDTO dto = this.queryById(Long.parseLong(id));
                if (dto == null) {
                    continue;
                }
                String childId = dto.getChildId();
                Set<String> childSet = new HashSet<>();
                if (StringUtils.isNotEmpty(childId)) {
                    childSet = new HashSet<>(Arrays.asList(childId.split(",")));
                }
                if (!childSet.contains(currentId)) {
                    childSet.add(currentId);
                    logger.debug("add child={} for task={}", currentId, dto.getId());
                    //  dto.setChildId(Joiner.on(",").join(childSet));
                    TaskDTO newDto = new TaskDTO();
                    newDto.setId(dto.getId());
                    newDto.setChildId(Joiner.on(",").join(childSet));
                    this.update(newDto);
                }
            }
        }
    }

    /**
     * 关联父节点
     *
     * @param task task
     * @return
     */
    public void associateParents(TaskDTO task, List<ApiResultCode> errorCode)
            throws SqlParserException {
        String parentIds = task.getParentId();
        if (parentIds == null || parentIds.isEmpty()) {
            return;
        }
        if (!validateLines(task, errorCode)) {
            logger.error("line validate faill!!!");
            return;
        }
        List<TaskDTO> parents = new ArrayList<>();
        String[] ids = parentIds.split(",");
        for (String id : ids) {
            TaskDTO taskDTO = queryById(Long.parseLong(id));
            parents.add(taskDTO);
        }
        task.associateParentNode(parents);
    }

    public List<TaskDTO> queryByPipeline(Long pipelineId) {
        return taskMapper.queryByPipeline(pipelineId);
    }

    @Transactional
    public Long save(TaskDTO task) {
        adaptor(task);
        try {
            associateParents(task, new ArrayList<>());
        } catch (SqlParserException e) {
            logger.error(e.getCause().getMessage());
        }
        taskMapper.save(task);
        return task.getId();
    }

    public Long simpleSave(TaskDTO task) {
        taskMapper.save(task);
        return task.getId();
    }

    public boolean preUpdate(TaskDTO task, List<ApiResultCode> errorCode) {
        adaptor(task);
        try {
            associateParents(task, errorCode);
        } catch (SqlParserException e) {
            logger.error(e.getCause().getMessage());
        }
        if (errorCode.size() != 0) {
            return false;
        }
        wrapTaskDTO(task);
        adaptor(task);
        return true;
    }

    public boolean simpleUpdate(TaskDTO task, List<ApiResultCode> errorCode) {
        return simpleUpdate(task, errorCode, null);
    }

    private boolean isDiffForOutTableCols(JSONObject oldItem, JSONObject newItem) {
        List<String> oldCols = oldItem.getJSONArray("tableCols").toJavaList(String.class);
        List<String> newCols = newItem.getJSONArray("tableCols").toJavaList(String.class);
        ToolUtil.sortList(oldCols);
        ToolUtil.sortList(newCols);
        if (oldCols.size() != newCols.size()) {
            return true;
        } else {
            for (int i = 0; i < oldCols.size(); ++i) {
                if (!oldCols.get(i).equals(newCols.get(i))) {
                    return true;
                }
            }
        }
        return false;
    }

    private boolean isOutputChanged(TaskDTO oldTask) {
        TaskDTO newDto = this.queryById(oldTask.getId());
        return isOutputChanged(oldTask, newDto);
    }

    private boolean isOutputChanged(TaskDTO oldTask, TaskDTO newTask) {
        boolean isChanged = false;
        JSONObject data = oldTask.view().getData();
//    TaskDTO newDto = this.queryById(oldTask.getId());
        JSONObject newData = newTask.view().getData();
        JSONArray oldOutput = data.getJSONArray("output");
        JSONArray newOutput = newData.getJSONArray("output");
        if (oldOutput.size() != newOutput.size()) {
            isChanged = true;
        } else {
            for (int i = 0; i < oldOutput.size(); ++i) {
                JSONObject oldItem = oldOutput.getJSONObject(i);
                JSONObject newItem = newOutput.getJSONObject(i);
                if (isDiffForOutTableCols(oldItem, newItem)) {
                    isChanged = true;
                    break;
                }
            }
        }
        return isChanged;
    }

    @Transactional(rollbackFor = Exception.class)
    public boolean simpleUpdate(TaskDTO task, List<ApiResultCode> errorCode, HttpServletRequest request) {
        try {
            JSONObject data = task.view().getData();
            boolean isSimple = false;
            if (data.containsKey("isSimple")) {
                isSimple = data.getBoolean("isSimple");
            }
            wrapTaskDTO(task);

            if (data.containsKey("algType") &&
                    data.getInteger("algType").equals(ETLEnum.SQL.getVal()) &&
                    data.containsKey("connected")) {
                //如果是父节点连接SQL节点，SQL需要去除adjustSql属性
                JSONObject tmpJson = JSONObject.parseObject(task.getDataJson());
                tmpJson.put("sql", "");
                tmpJson.put("adjustSql", "");
                task.setDataJson(tmpJson.toJSONString());
            } else if (null != task.getType() && task.getType().equals(TaskTypeEnum.TASK_TYPE_ETL.getVal())&&
                data.containsKey("algType") &&
                data.getInteger("algType").equals(ETLEnum.PIVOT_TABLE.getVal())) {
                //生成output中column信息
                addPivotTableOutputCols(task);
            }
            if (isSimple) {
                adaptor(task);
                JSONObject dataTmp = task.view().getData();
                if (dataTmp.containsKey("output") && dataTmp.getJSONArray("output").size() != 0) {
                    JSONArray output = dataTmp.getJSONArray("output");
                    if (StringUtils.isNotEmpty(task.getName())) {
                        task.modifyNodeName(output, task.getName());
                        dataTmp.put("output", output);
                        task.setDataJson(JSONObject.toJSONString(dataTmp));
                    }
                }
                if (!update(task)) {
                    return false;
                }
            } else {
                // 修改节点配置
                TaskDTO oldTask = this.queryById(task.getId());
                adaptor(task);
                associateParents(task, errorCode);
                if (errorCode.size() > 0) {
                    return false;
                }
                adaptor(task);
                //如果是模型节点，默认output有改变
                Integer taskType = task.getType();
                if (taskType == TaskTypeEnum.TASK_TYPE_MODEL.getVal()
                        || taskType == TaskTypeEnum.TASK_TYPE_ALGOPY.getVal()) {
                    JSONObject dataObj = task.view().getData();
                    dataObj.put("autoJoin", "yes");
                    task.setDataJson(dataObj.toJSONString());
                } else {
                    boolean isOutChanged = false;
                    isOutChanged = isOutputChanged(oldTask, task);
                    if (isOutChanged) {
                        // this.triggerAutoJoin(task.getId());
                        JSONObject dataObj = task.view().getData();
                        dataObj.put("autoJoin", "yes");
                        task.setDataJson(dataObj.toJSONString());
                    }
                }
                if (!this.update(task)) {
                    return false;
                }
                // 支持不更新后续节点配置需求临时注释掉，后续看是否有需要打开
                // end
                if (this.updateDescendantsConfig(task, errorCode)) {
                    clearTaskUtil(task.getPipelineId());
                    initTaskUtil(task.getPipelineId());
                }
            }
        } catch (MappingException e1) {
            logger.error("simpleUpdate error={}", e1.getMessage(),e1);
            return false;
        } catch (AutoTriggerException e2) {
            task.setException(e2);
            logger.error("simpleUpdate error={}", e2.getMessage(),e2);
            return true;
        } catch (Exception e3) {
            task.setException(e3);
            logger.error("simpleUpdate error={}", e3.getMessage(),e3);
            return false;
        }
        return true;
    }

    private void addPivotTableOutputCols(TaskDTO task) {
        JSONObject newData = task.view().getData();
        JSONArray inputs = newData.getJSONArray("input");
        if (inputs == null || inputs.isEmpty()) {
            return;
        }
        JSONObject input = inputs.getJSONObject(0);

        int flag = 0;
        String rows ="";
        for (String row:newData.getJSONArray("rows").toJavaList(String.class)) {
            if ( flag==0 ) {
                rows = row;
            } else {
                rows = rows+","+row;
            }
            flag++;
        }
        JSONArray cols = newData.getJSONArray("cols");
        StringBuilder sb= new StringBuilder();
        String tempStr = "";
        if (cols!=null&&cols.size()>0) {
            for (int i=0;i<cols.size();i++) {
                if (i==0) {
                    tempStr = SqlUtil.formatValue(cols.getString(i));
                } else {
                    tempStr = tempStr+","+SqlUtil.formatValue(cols.getString(i));
                }
            }
        }
        String colStr = sb.append("array[").append(tempStr).append("]").toString();

        Integer colSize = newData.getIntValue("colSize");

        String statisticsStr = PivotTable.resetDuplicateData(newData.getJSONArray("statistics")).toJSONString();
        String table = input.getString("tableName");
        table = ToolUtil.alignTableName(table, 0);
        JSONArray output = newData.getJSONArray("output");

        String sql = String.format(PIVOT_TABLE_SQL, SqlUtil.formatValue(table), SqlUtil.formatValue(rows), colStr,colSize,SqlUtil.formatValue(statisticsStr));

        Connection conn = null;

        try {
            String[] tableCols = null;
            conn = gpDataProvider.getConn(1L);
            Statement st = conn.createStatement();
            ResultSet rs = st.executeQuery(sql);
            while (rs.next()){
                String res = rs.getString(1);
                tableCols = res.split(",");
            }

            //生成output中column信息
            if (tableCols!=null) {
                JSONArray outputs = newData.getJSONArray("output");
                List<String> outputColTypes = new ArrayList<>();
                List<String> tc = input.getJSONArray("tableCols").toJavaList(String.class);
                JSONArray ct = input.getJSONArray("columnTypes");
                JSONObject semantic = new JSONObject();

                for (int l=0;l<tableCols.length;l++) {
                    if (tc.contains(tableCols[l])) {
                        outputColTypes.add(ct.getString(tc.indexOf(tableCols[l])));
                    } else {
                        outputColTypes.add(DataTypeEnum.DECIMAL.getValue());
                    }
                    semantic.put(tableCols[l],"null");
                }

                outputs.getJSONObject(0).put("tableCols", tableCols);
                outputs.getJSONObject(0).put("columnTypes", outputColTypes);
                outputs.getJSONObject(0).put("semantic", semantic);
                task.setDataJson(newData.toJSONString());
            }
        } catch (Exception e) {
            logger.error(e.getMessage(),e);
        } finally {
            JDBCUtil.close(conn, null, null);
        }
    }


    public boolean updateDescendantsConfig(TaskDTO current, List<ApiResultCode> errorCode) {
        String childId = current.getChildId();
        if (StringUtils.isNotEmpty(childId)) {
            Queue<Long> queue = new ArrayDeque<>();
            String[] ids = childId.split(",");
            for (String id : ids) {
                queue.offer(Long.parseLong(id));
            }
            while (!queue.isEmpty()) {
                Long taskId = queue.poll();
                TaskDTO dto = this.queryById(taskId);
                TaskDTO oldDto = SerializationUtils.clone(dto);
                JSONObject conf = oldDto.view().getData();
                int algType = conf.getInteger("algType");
                int type = oldDto.getType();
                try {
                    this.associateParents(dto, errorCode);
                } catch (SqlParserException e) {
                    logger.error(e.getCause().getMessage());
                }
                if (errorCode.size() > 0) {
                    logger.error("updateDescendants1 fail!!!!");
                    return false;
                }
                // wrapTaskDTO(dto);
                adaptor(dto);
                try {
                    if (type == TaskTypeEnum.TASK_TYPE_ETL.getVal()
                            && (ETLEnum.JOIN.getVal() == algType || ETLEnum.UNION.getVal() == algType)) {
                        dto.setException(null);
                        dto.setGmtModify(null);
                        boolean isOutChanged = isOutputChanged(oldDto, dto);
                        if (isOutChanged) {
                            // this.triggerAutoJoin(dto.getId());
                            JSONObject dataObj = dto.view().getData();
                            dataObj.put("autoJoin", "yes");
                            dto.setDataJson(dataObj.toJSONString());
                        }
                    }
                    this.update(dto);
//                    if (!) {
//                        logger.error("updateDescendants2 fail!!!!");
//                        return false;
//                    }
                } catch (Exception e) {
                    logger.error(e.getMessage());
                    return false;
                }
                childId = dto.getChildId();
                if (StringUtils.isNotEmpty(childId)) {
                    ids = childId.split(",");
                    for (String id : ids) {
                        queue.offer(Long.parseLong(id));
                    }
                }
            }
        }
        return true;
    }

    /**
     * 两个节点之间插入pca算法，进行特殊处理 pca的featureCols 为后继节点的featureCols
     *
     * @return
     */
    public void addPcaAlgorithmHandler(TaskVO vo, String childId) {
        if (StringUtils.isNotEmpty(childId) && childId.split(",").length == 1) {
            Long chId = Long.parseLong(childId);
            TaskVO childNode = this.queryById(chId).view();
            JSONObject data = childNode.getData();
            if (data.containsKey("feature_cols")) {
                JSONArray featureCols = data.getJSONArray("feature_cols");
                int index = 1;
                JSONObject pcaData = vo.getData();
                JSONArray params = pcaData.getJSONArray("setParams");
                JSONObject item = params.getJSONObject(index);
                item.put("value", featureCols);
                params.set(index, item);
                TaskDTO tmpDto = new TaskDTO();
                vo.setData(pcaData);
//        tmpDto.setId(vo.getId());
//        tmpDto.setDataJson(pcaData.toJSONString());
//        this.update(tmpDto);
            }
        }
    }

    public boolean verifyParams(TaskDTO dto, List<ApiResultCode> errorCode) {
        if (dto.getType() == null || StringUtils.isEmpty(dto.getParentId())) {
            TaskDTO taskDTO = this.queryById(dto.getId());
            dto.setParentId(taskDTO.getParentId());
            dto.setType(taskDTO.getType());
        }
        boolean flag = true;
        if (dto.getType().equals(TaskTypeEnum.TASK_TYPE_ALGO.getVal())) {
            flag = dto.validateParamTypes(errorCode);
        } else if (dto.getType().equals(TaskTypeEnum.TASK_TYPE_ETL.getVal())) {
            BaseETL etl = BaseETL.instance(dto.view().getData());
            flag = etl.verify(dto.view(), errorCode);
        }
        return flag;
    }

    @Transactional(rollbackFor = Exception.class)
    public boolean update(TaskDTO task) {
        return taskMapper.update(task);
    }

    @Transactional(rollbackFor = Exception.class)
    public void delete(Long id) {
        Map<String, Object> params = Maps.newHashMap();
        params.put("ids", new Long[]{id});
        taskMapper.delete(params);
    }

    private void modifyNodeName(TaskDTO dto) {
        if (StringUtils.isNotEmpty(dto.getDataJson())) {
            JSONObject jsonObject = null;
            try {
                jsonObject = JSONObject.parseObject(dto.getDataJson());
            } catch (Exception e) {
                logger.error(e.getMessage());
                return;
            }
            if (jsonObject != null && jsonObject.containsKey("output")) {
                JSONArray output = jsonObject.getJSONArray("output");
                for (int i = 0; i < output.size(); ++i) {
                    JSONObject item = output.getJSONObject(i);
                    // item.put("nodeName", String.format("%s_copy", item.getString("nodeName")));
                    item.put("nodeName", dto.getName());
                    output.set(i, item);
                }
                jsonObject.put("output", output);
            }
            dto.setDataJson(jsonObject.toJSONString());
        }
    }

    public Long copy(TaskVO vo, List<ApiResultCode> errorCode) {
        TaskDTO dto = this.queryById(vo.getId());
        JSONObject data = JSONObject.parseObject(dto.getDataJson());
        if (data.containsKey("forbidden")) {
            boolean forbidden = data.getBoolean("forbidden");
            if (forbidden) {
                logger.warn("forbidden node can not copy");
                errorCode.add(ApiResultCode.TASK_FORBIDDEN_COPY);
                return null;
            }
        }
        JSONObject position = vo.getData().getJSONObject("position");
        if (position != null) {
            data.put("position", position);
            dto.setDataJson(data.toJSONString());
        }
        dto.setId(null);
        dto.setChildId(null);
        dto.setUserId(JwtUtil.getCurrentUserId());
        // dto.setName(String.format("%s_copy", dto.getName()));
        this.updateTaskName(dto, "copy");
        dto.setParentId(null);
        dto.setGmtCreate(null);
        dto.setGmtModify(null);
        modifyNodeName(dto);
        TaskVO taskVO = dto.view();
        TaskDTO newDto = this.load(taskVO);
        Long id = this.simpleSave(newDto);
        newDto.setId(id);
        this.updateTaskNode(newDto.getPipelineId(), newDto, "copy");

        return id;
    }

    public List<Long> queryIdByProjectId(long projectId) {
        return taskMapper.queryIdByProjectId(projectId);
    }

    /**
     * pipelines必须为projectId下的，为了加快速度
     *
     * @param pipelines
     * @param projectId
     */
    public void deleteWithRelevantData(List<Long> pipelines, long projectId) {
        if (!CollectionUtils.isEmpty(pipelines)) {
            widgetFavouriteService.deleteByPipelines(pipelines);
        }

        List<Long> taskIds = queryIdByProjectId(projectId);
        widgetService.deleteByTasks(taskIds);

        taskMapper.deleteByProjectId(projectId);
        taskInstanceMapper.deleteByProjectId(projectId);
        //TODO 快照是不是也要删除
    }

    /**
     * 删除pipeline的相关数据
     *
     * @param pipelineId
     */
    public void deletePipelineRelevantData(Long pipelineId, Long projectId) {
        widgetFavouriteService.deleteByPipelines(Arrays.asList(pipelineId));
        //删除节点的所有可视化视图
        List<Long> taskIds = taskMapper.queryIdByPipeline(pipelineId);
        if (CollectionUtil.isEmpty(taskIds)) {
            return;
        }
        List<Long> widgetIds = widgetService.queryIdByTasks(taskIds);
        widgetService.delete(null, widgetIds);
        logger.debug("deletePipelineRelevantData({}, {}) delete taskIds={}", pipelineId, projectId,
                taskIds);
        logger.debug("deletePipelineRelevantData delete widgetIds={}", widgetIds);
        //删除添加的节点
        taskMapper.deleteByPipelineId(pipelineId);
        //更新dashboard
        dashboardService.updateAfterDelWidgets(widgetIds, projectId);
        List<TaskDTO> taskDTOS = taskMapper.queryByPipeline(pipelineId);
        taskDTOS.forEach(taskDTO -> dropGenerateTables(taskDTO));
    }

    public void delete(Long userId, List<Long> ids) {
        if (CollectionUtils.isEmpty(ids)) {
            return;
        }

        Map<String, Object> params = Maps.newHashMap();
        params.put("ids", ids);
        params.put("userId", userId);

        taskMapper.delete(params);
    }

    private void recursion(Object oldObj, Object currentObj) {
        if (oldObj instanceof JSONArray && currentObj instanceof JSONArray) {
            JSONArray oldJsonArray = (JSONArray) oldObj;
            JSONArray currentJsonArray = (JSONArray) currentObj;
            int length1 = oldJsonArray.size();
            int length2 = currentJsonArray.size();
            int i = 0;
            int j = 0;
            while (i < length1 && j < length2) {
                if (!(oldJsonArray.get(i) instanceof JSONArray || oldJsonArray
                        .get(i) instanceof JSONObject)) {
                    oldJsonArray.set(i, currentJsonArray.get(j));
                } else {
                    recursion(oldJsonArray.get(i), currentJsonArray.get(j));
                }
                ++i;
                ++j;
            }
            int m = i;
            while (i < length1) {
                oldJsonArray.remove(m);
                ++i;
            }
            while (j < length2) {
                oldJsonArray.add(currentJsonArray.get(j));
                ++j;
            }
        } else if (oldObj instanceof JSONObject && currentObj instanceof JSONObject) {
            JSONObject jsonObject = (JSONObject) currentObj;
            JSONObject oldJsonObject = (JSONObject) oldObj;
            Set<String> keySet = jsonObject.keySet();
            Iterator<String> iterator = keySet.iterator();
            while (iterator.hasNext()) {
                String key = iterator.next();
                if (!oldJsonObject.containsKey(key)) {
                    oldJsonObject.put(key, jsonObject.get(key));
                } else {
                    if (oldJsonObject.get(key) instanceof JSONObject && jsonObject
                            .get(key) instanceof JSONObject) {
                        recursion(oldJsonObject.get(key), jsonObject.get(key));
                    } else if (oldJsonObject.get(key) instanceof JSONArray && jsonObject
                            .get(key) instanceof JSONArray) {
                        recursion(oldJsonObject.get(key), jsonObject.get(key));
                    } else {
                        oldJsonObject.put(key, jsonObject.get(key));
                    }
                }
            }
        }
    }

    /**
     * update的时候封装新的DTO
     *
     * @param task input TaskDTO
     * @return TaskDTO
     */
    public void wrapTaskDTO(TaskDTO task) {
        TaskDTO oldTask = this.queryById(task.getId());
        JSONObject oldJson = JSONObject.parseObject(oldTask.getDataJson());
        JSONObject currJson = JSONObject.parseObject(task.getDataJson());
        recursion(oldJson, currJson);
        task.setDataJson(oldJson.toJSONString());
    }

    /**
     * 删除指定节点
     *
     * @param id
     * @return
     */
    public boolean deleteTaskNode(Long id) {
        TaskDTO taskDTO = this.queryById(id);
        String parentId = taskDTO.getParentId();
        String childId = taskDTO.getChildId();
        if (StringUtils.isNotEmpty(parentId)) {
            String[] ids = parentId.split(",");
            for (String singleId : ids) {
                TaskDTO dto = this.queryById(Long.parseLong(singleId));
                dto.setChildId(removeIds(dto.getChildId(), id.toString()));
                if (!this.update(dto)) {
                    logger.error("update task={} fail!!!", dto.getId());
                    return false;
                }
                this.updateTaskNode(dto.getPipelineId(), dto, "update");
            }
        }
        if (StringUtils.isNotEmpty(childId)) {
            String[] ids = childId.split(",");
            for (String singleId : ids) {
                TaskDTO dto = this.queryById(Long.parseLong(singleId));
                if (dto == null) {
                    return false;
                }
                dto.setParentId(removeIds(dto.getParentId(), id.toString()));
                //如果子节点是清洗节点，删除历史操作记录
//        if (TaskTypeEnum.TASK_TYPE_CLEAN.getVal().equals(dto.getType())) {
//          dropGenerateTables(dto);
//          clearInOutput(dto);
//        }
                if (!this.update(dto)) {
                    logger.error("update task={} fail!!!", dto.getId());
                    return false;
                }
                this.updateTaskNode(dto.getPipelineId(), dto, "update");
            }
        }
        try {
            this.updateTaskNode(taskDTO.getPipelineId(), taskDTO, "delete");
            this.dropGenerateTables(taskDTO);
            this.delete(id);
            this.updateTaskName(taskDTO, "delete");
        } catch (Exception e) {
            logger.error(e.getMessage());
            return false;
        }
        return true;
    }

    /**
     * 检查两个节点是否是兄弟节点
     *
     * @param a
     * @param b
     * @return
     */
    public boolean isBrotherNode(TaskDTO a, TaskDTO b) {
        boolean isBrother = false;
        String parentIdA = a.getParentId();
        String parentIdB = b.getParentId();
        if (StringUtils.isNotEmpty(parentIdA) && StringUtils.isNotEmpty(parentIdB)) {
            Arrays.asList(parentIdA.split(","));
            Set<String> set1 = Arrays.stream(Arrays.asList(parentIdA.split(",")).toArray())
                    .map(Object::toString).collect(Collectors.toSet());
            Set<String> set2 = Arrays.stream(Arrays.asList(parentIdB.split(",")).toArray())
                    .map(Object::toString).collect(Collectors.toSet());
            set1.retainAll(set2);
            if (set1.size() != 0) {
                isBrother = true;
            }
        }
        return isBrother;
    }

    /**
     * 删除连线
     *
     * @param referParentId
     * @param referChildId
     * @return
     */
    public boolean deleteLine(Long referParentId, Long referChildId) {
        TaskDTO parentNode = this.queryById(referParentId);
        TaskDTO childNode = this.queryById(referChildId);
        parentNode.setChildId(removeIds(parentNode.getChildId(), referChildId.toString()));
        childNode.setParentId(removeIds(childNode.getParentId(), referParentId.toString()));
        //清洗节点删除连线时删除历史操作记录
//    if (TaskTypeEnum.TASK_TYPE_CLEAN.getVal().equals(childNode.getType())) {
//      dropGenerateTables(childNode);
        //清除input,output
//      clearInOutput(childNode);
//    }
        if (TaskTypeEnum.TASK_TYPE_ETL.getVal().equals(childNode.getType())) {
            //可能有多个数据源（FILTER只能连一个），但是需要删表
            dropGenerateTables(childNode);
            clearInOutput(childNode, parentNode);
        }
        if (TaskTypeEnum.TASK_TYPE_GRAPH.getVal().equals(childNode.getType())) {
            //有多个数据源，但是不删表，（如果后期需要删，把上面的ETL合并了）
            clearInOutput(childNode, parentNode);
        }
        if (TaskTypeEnum.TASK_TYPE_ALGO.getVal().equals(childNode.getType())) {
            //只能连一个数据源，且需要清除的属性有不同， 且较多
            clearAlgoOutput(childNode, parentNode);
        }
        if (!this.update(parentNode) || !this.update(childNode)) {
            logger.error("deleteLine fail!!!");
            return false;
        }
        this.updateTaskNode(parentNode.getPipelineId(), parentNode, "update");
        this.updateTaskNode(childNode.getPipelineId(), childNode, "update");
        return true;
    }

    /**
     * 清除机器学习算子产生的结果
     *
     * @param childNode
     * @param parentNode
     */
    private void clearAlgoOutput(TaskDTO childNode, TaskDTO parentNode) {
        JSONObject childJsonObject = JSONObject.parseObject(childNode.getDataJson());
        //output可以清空（未验证）
        childJsonObject.put("output", new JSONArray());
        // 因为机器学习算子只能加入一个input 所以可以直接清空
        childJsonObject.put("input", new JSONArray());
        childJsonObject.put("parentType", new JSONArray());
        childJsonObject.put("feature_cols", new JSONArray());
        if (null != childJsonObject.get("out_table_rename")) {
            childJsonObject.remove("out_table_rename");
        }
        if (null != childJsonObject.get("source_table")) {
            childJsonObject.remove("source_table");
        }
        if (null != childJsonObject.get("model_table_rename")) {
            childJsonObject.remove("model_table_rename");
        }
        if (null != childJsonObject.get("autoJoin")) {
            childJsonObject.remove("autoJoin");
        }
        if (null != childJsonObject.get("isSample")) {
            childJsonObject.remove("isSample");
        }
        if (null != childJsonObject.get("isSimple")) {
            childJsonObject.remove("isSimple");
        }
        if (null != childJsonObject.get("lastStatus")) {
            childJsonObject.remove("lastStatus");
        }
        if (null != childJsonObject.get("newFeatures")) {
            childJsonObject.remove("newFeatures");
        }
        if (null != childJsonObject.get("lastTimeStamp")) {
            childJsonObject.remove("lastTimeStamp");
        }
        childNode.setDataJson(childJsonObject.toJSONString());

    }

    public boolean deleteAssociateTask(TaskDTO taskDTO, boolean isLine) {
        List<TaskDTO> tasks = queryByPipeline(taskDTO.getPipelineId());
        Map<Long, TaskDTO> setNodes = new HashMap<>();
        int lastSize;
        do {
            lastSize = tasks.size();
            Iterator<TaskDTO> iterator = tasks.iterator();
            while (iterator.hasNext()) {
                TaskDTO dto = iterator.next();
                Long currentId = dto.getId();
                String parentIdStr = dto.getParentId();
                List<String> parentIds = new ArrayList<>();
                if (parentIdStr != null && !parentIdStr.isEmpty()) {
                    parentIds = Arrays.asList(parentIdStr.split(","));
                }
                if (currentId.equals(taskDTO.getId())) {
                    setNodes.put(currentId, dto);
                    iterator.remove();
                } else if (parentIds.size() > 0) {
                    for (String parentId : parentIds) {
                        Long pId = Long.parseLong(parentId);
                        if (setNodes.containsKey(pId)) {
                            setNodes.put(dto.getId(), dto);
                            iterator.remove();
                        }
                    }
                }
            }
        } while (lastSize != tasks.size());
        for (TaskDTO dto : setNodes.values()) {
            if (!isLine && dto.getId().equals(taskDTO.getId())) {
                this.updateTaskNode(dto.getPipelineId(), dto, "delete");
                this.delete(dto.getId());
                this.updateTaskName(dto, "delete");
                continue;
            }
            TaskVO vo = dto.view();
            vo.setParentId("");
            this.load(vo);
            vo.getData().put("input", new JSONArray());
            vo.getData().put("output", new JSONArray());
            TaskDTO newDto = vo.toTask();
            adaptor(newDto);
            this.updateTaskName(newDto, "update");
            this.updateTaskNode(newDto.getPipelineId(), newDto, "update");
            taskMapper.update(newDto);
        }
        return true;
    }

    /**
     * 移动、连线、禁用不校验参数
     *
     * @param dto
     * @return
     */
    public boolean isNeedParamVerify(TaskDTO dto) {
        JSONObject jsonObject = JSONObject.parseObject(dto.getDataJson());
        //移动算子
        if (jsonObject.containsKey("position")) {
            return false;
        }
        //连线或者禁用只有一个传参
        if (jsonObject.keySet().size() == 1) {
            return false;
        }
        return true;
    }

    /**
     * 算子数值类型的配置校验合法性
     *
     * @param errorCode
     * @return
     */
    public boolean validateConfigParams(List<ApiResultCode> errorCode, TaskDTO dto) {
        JSONObject jsonObject = JSONObject.parseObject(dto.getDataJson());
        String algName = jsonObject.getString("algName");
        try {
            ValidateConfigEnum validateConfigEnum = ValidateConfigEnum.valueOf(algName);
            return validateConfigEnum.validate(jsonObject, errorCode);
        } catch (IllegalArgumentException e) {//没有校验的枚举类，捕获异常,表示不需要校验,直接返回true
            return true;
        }
    }

    /**
     * 创建Union节点时，  设置output时会因为没有taskId而 填充进null
     *
     * @param errorCode
     * @return
     */
    public void validateTableName(TaskDTO dto) {
        JSONObject jsonObject = JSONObject.parseObject(dto.getDataJson());
        JSONObject output = jsonObject.getJSONArray("output").getJSONObject(0);
        if (null != dto.getId()) {
            String tableName = output.getString("tableName");
            List<String> parts = Arrays.asList(tableName.split("_"));
            for (int i = 0; i < parts.size(); i++) {
                if (parts.get(i).equals("null")) {
                    parts.set(i, String.valueOf(dto.getId()));
                    tableName = Joiner.on("_").join(parts) + "_";
                    output.put("tableName", tableName);
                    dto.setDataJson(jsonObject.toJSONString());
                    this.update(dto);
                    break;
                }
            }
        }
    }

//    public void clearActions(List<TaskInstanceDTO> actions, Long taskId) {
//        if (actions == null || actions.isEmpty()) {
//            return;
//        } else if (actions.size() > 1) {
//            Long id = actions.get(1).getId();//清理第二条生成的表
//            TaskInstanceDTO taskInstanceDTO = taskInstanceMapper.queryById(id);
//            String table = JSONObject.parseObject(taskInstanceDTO.getDataJson()).getString("table");
//            if (!table.split("\\.")[0].equals(GREEN_PLUM_DEFAULT_SCHEMA)) {
//                gpDataProvider.dropRedundantTables(table, true);
//            }
//        }
//        taskInstanceService.deleteActions(taskId, actions.get(0).getId());
//    }

    /**
     * 删除算子运行产生的表、视图
     */
    @Transactional(rollbackFor = Exception.class)
    public void dropGenerateTables(TaskDTO taskDTO) {
        Long taskId = taskDTO.getId();
        if (TaskTypeEnum.TASK_TYPE_CLEAN.getVal() == taskDTO.getType()) {
            List<TaskInstanceDTO> actions = taskInstanceService.queryByTaskIdAndOrder(taskId);
            if (CollectionUtil.isNotEmpty(actions)) {
                taskInstanceService
                        .deleteActions(taskId, actions.get(0).getId());
                deleteWidgets(taskDTO.getProjectId(), taskId, actions.get(0).getId());
            }
        } //redo undo需要，此处不删表
//    else if (TaskTypeEnum.TASK_TYPE_ETL.getVal() == taskDTO.getType() ||
//        TaskTypeEnum.TASK_TYPE_ALGO.getVal() == taskDTO.getType() ||
//        TaskTypeEnum.TASK_TYPE_DEFINE.getVal() == taskDTO.getType()) {
        //查出所有task_instance
//      List<TaskInstanceDTO> taskInstances = taskInstanceService
//          .querySuccessInstanceByTaskId(taskId);
//      if (ObjectUtil.isEmpty(taskInstances)) {
//        return;
//      }
//      //提取出表名
//      List<String> tables = new ArrayList<>();
//      boolean isView = PipelineUtil.extractTableName(taskInstances, tables);
//      if (ObjectUtil.isEmpty(tables)) {
//        return;
//      }
//      gpDataProvider.dropRedundantTables(Joiner.on(",").join(tables), isView);
//    }
    }

    /**
     * 检查子节点是否依赖于当前要删除的节点
     *
     * @param id 删除节点id
     */
    public boolean checkChildDependOn(Long id) {
        List<Long> childIds = taskMapper.queryIdByParentId(id);
        //没有子ETL节点依赖我
        if (CollectionUtil.isEmpty(childIds)) {
            return false;
        }
        List<Long> instanceIds = taskInstanceService.queryIdByTaskId(childIds);
        //没有运行实例，说明没有生成视图，没有依赖
        if (CollectionUtil.isEmpty(instanceIds)) {
            return false;
        }
        return true;
    }

    public void clearInOutput(TaskDTO childNode, TaskDTO parentNode) {
        this.clearInOutput(childNode, parentNode, Boolean.TRUE);
    }

    /**
     * 删除连线后，清空当前节点的input 和 output (input 数据源可能还需要根据待删除的父节点的nodename属性进行删除)
     *
     * @param childNode
     * @param parentNode
     */
    public void clearInOutput(TaskDTO childNode, TaskDTO parentNode, Boolean clearOutput) {
        String newParentIds = this
                .removeIds(childNode.getParentId(), parentNode.getId().toString());
        JSONObject childJsonObject = JSONObject.parseObject(childNode.getDataJson());
        childNode.setParentId(newParentIds);
        if (clearOutput) {
            //output可以清空
            childJsonObject.put("output", new JSONArray());
        }

        //columnsFilter可以清空 如果有的话 (FILTER 有筛选列的记录字段columnsFilter)
        if (childJsonObject.containsKey("columnsFilter")) {
            childJsonObject.put("columnsFilter", new JSONArray());
        }

        //清理input 只能清楚对应的parentNode 里的数据源
        String parentNodeName = parentNode.getName();
        Integer parentNodeType = parentNode.getType();
        JSONArray childInputArray = childJsonObject.getJSONArray("input");
        Iterator<Object> InputIterator = childInputArray.iterator();
        int inputIndex = 0;
        while (InputIterator.hasNext()) {
            JSONObject input = (JSONObject) InputIterator.next();
            if (input.getString("nodeName").equals(parentNodeName)) {
                InputIterator.remove();
                break;
            } else {
                inputIndex += 1;
            }
        }

        //清理partentType
        JSONArray childsParentType = childJsonObject.getJSONArray("parentType");
        if (childsParentType.size() == 0) {
            //只有一个输入源，直接清空就好

        } else if (childsParentType.size() == 1) {
            childJsonObject.put("parentType", new JSONArray());
        } else {
            //如果多个数据源,只删除第一个符合条件的元素，然后
            //TODO undo之后 删除数据源 报错 下标越界
            childsParentType.remove(inputIndex);
            childJsonObject.put("parentType", childsParentType);
        }
        childJsonObject.put("input", childInputArray);

        childNode.setDataJson(childJsonObject.toJSONString());
    }

    /**
     * 删除历史记录的图表
     */
    @Transactional
    public void deleteWidgets(Long projectId, Long taskId, Long actionId) {
        List<WidgetDTO> widgetDTOS = widgetService.queryTemplateByTaskId(taskId);
        if (CollectionUtil.isNotEmpty(widgetDTOS)) {
            for (WidgetDTO widget : widgetDTOS) {
                JSONObject widgetData = JSONObject.parseObject(widget.getDataJson());
                JSONObject formData = widgetData.getJSONObject("formData");
                if (null != formData) {
                    if (ColumnConstant.TYPE_CLEAN.equals(formData.getString("dataType")) &&
                            formData.getLong("dataId") >= actionId) {
                        widgetService.delete(widget.getId());
                    }
                }
            }
        }
        DashboardDTO dashboardDTO = dashboardService.queryByProjectId(projectId);
        if (null != dashboardDTO) {
            JSONObject layOut = JSONObject.parseObject(dashboardDTO.getLayout());
            JSONArray gridItems = layOut.getJSONArray("gridItems");
            if (CollectionUtil.isNotEmpty(gridItems)) {
                for (int i = 0; i < gridItems.size(); i++) {
                    JSONObject widget = gridItems.getJSONObject(i);
                    JSONObject formData = widget.getJSONObject("formData");
                    if (null != formData) {
                        if (ColumnConstant.TYPE_CLEAN.equals(formData.getString("dataType")) &&
                                formData.getLong("dataId") >= actionId) {
                            widgetService.delete(widget.getLong("widgetId"));
                        }
                    }
                }
            }
        }
    }

    public List<Pair<Integer, Integer>> getRectanglePoint(List<TaskDTO> tasks) {
        List<Pair<Integer, Integer>> rec = new ArrayList<>();
        Pair<Integer, Integer> leftUp = Pair.of(10000, 10000);
        Pair<Integer, Integer> rightDown = Pair.of(0, 0);

        for (TaskDTO task : tasks) {
            TaskVO vo = task.view();
            JSONObject data = vo.getData().getJSONObject("position");
            int row = data.getInteger("row");
            int col = data.getInteger("col");
            if (row <= leftUp.getKey()) {
                leftUp = Pair.of(row, leftUp.getValue());
            }
            if (col <= leftUp.getValue()) {
                leftUp = Pair.of(leftUp.getKey(), col);
            }

            if (row >= rightDown.getKey()) {
                rightDown = Pair.of(row, rightDown.getValue());
            }
            if (col >= rightDown.getValue()) {
                rightDown = Pair.of(rightDown.getKey(), col);
            }
        }
        rec.add(leftUp);
        rec.add(rightDown);
        return rec;
    }

    /**
     * 批量拷贝时候计算拷贝组放的位置， 参考tabelue
     *
     * @param tasks
     * @return
     */
    public boolean batchPositionCalculateForCopy(List<TaskDTO> tasks, List<TaskDTO> currentNodes) {
        List<Pair<Integer, Integer>> recCopy = this.getRectanglePoint(tasks);
        List<Pair<Integer, Integer>> recOrg = this.getRectanglePoint(currentNodes);

        Pair<Integer, Integer> leftUpOrg = recOrg.get(0);
        Pair<Integer, Integer> rightDownOrg = recOrg.get(1);

        Pair<Integer, Integer> leftUpCopy = recCopy.get(0);
        Pair<Integer, Integer> rightDownCopy = recCopy.get(1);

        int rowStep = rightDownOrg.getKey() - leftUpCopy.getKey() + 1;
        int colStep = leftUpOrg.getValue() - leftUpCopy.getValue();

        for (TaskDTO task : tasks) {
            TaskVO vo = task.view();
            JSONObject data = vo.getData();
            JSONObject position = data.getJSONObject("position");
            int row = position.getInteger("row");
            int col = position.getInteger("col");
            position.put("row", row + rowStep);
            position.put("col", col + colStep);
            data.put("position", position);
            task.setDataJson(data.toJSONString());
        }

        return true;
    }

    public boolean batchProduce(List<TaskDTO> tasks, boolean isCopy) {
        return batchProduce(tasks, null, isCopy);
    }

    private Map<Long, TaskDTO> initUtilMap(List<TaskDTO> existTasks) {
        Map<Long, TaskDTO> utilMap = new HashMap<>();
        //原始task记录
        for (TaskDTO task : existTasks) {
            utilMap.put(task.getId(), task);
        }
        return utilMap;
    }

    private Map<Long, Long> initRelationShipMap(List<TaskDTO> tasks, Map<Long, TaskDTO> utilMap, boolean isCopy, boolean isProjectCopy) {
        // <oldTaskId, newTaskId> 原始节点 对应复制出来的节点
        Map<Long, Long> relationMap = new HashMap<>();
        for (TaskDTO task : tasks) {
            Long oldId = task.getId();
            if (isCopy) {
                task.setId(null);
                if (isProjectCopy) {
                    //project 整体复制不需要 更该节点名
                    this.updateTaskName(task, "copy");
                }
            }
            Long newId = this.simpleSave(task);
            task.setId(newId);
            if (null != utilMap) {
                utilMap.put(newId, task);
            }
            relationMap.put(oldId, newId);
        }
        return relationMap;
    }

    /**
     * 批量生产
     *
     * @param tasks
     * @return
     */
    @Transactional(rollbackFor = Exception.class)
    public boolean batchProduce(List<TaskDTO> tasks, Long assignedPipelineId, boolean isCopy) {
        if (tasks == null || tasks.size() == 0) {
            return false;
        }
        boolean projectCopyFlag = false;
        TaskDTO taskDTO = tasks.get(0);
        if (null == assignedPipelineId) {
            projectCopyFlag = true;
            assignedPipelineId = taskDTO.getPipelineId();
        }
        List<TaskDTO> existTasks = this.queryByPipeline(assignedPipelineId);
        Map<Long, TaskDTO> utilMap = initUtilMap(existTasks);

        // <oldTaskId, newTaskId> 原始节点 对应复制出来的节点
        Map<Long, Long> relationMap = initRelationShipMap(tasks, utilMap, isCopy, projectCopyFlag);
        taskUtil.getRelationMap().put(assignedPipelineId, relationMap);

        // newTaskId, oldTaskId
        Map<Long, Long> reverseRelationMap = MapUtil.reverseMap(relationMap);

        // update relationShip
        if (isCopy) {
            // 批量复制的时候处理
            // 批量复制时候老的指向关系删除，但是保留内部指向关系
            List<TaskDTO> nodes = new ArrayList<>();
            for (TaskDTO task : tasks) {
                String parentIdStr = task.getParentId();
                String childIdStr = task.getChildId();
                TaskDTO dto = new TaskDTO();
                dto.setType(task.getType());
                dto.setId(task.getId());
                dto.setOperatorId(task.getOperatorId());
                dto.setDataJson(task.getDataJson());
                if (StringUtils.isNotEmpty(parentIdStr)) {
                    String[] tmpIds = parentIdStr.split(",");
                    List<String> newIds = new ArrayList<>();
                    for (String id : tmpIds) {
                        if (relationMap.containsKey(Long.parseLong(id))) {
                            newIds.add(relationMap.get(Long.parseLong(id)).toString());
                        }
                    }
                    dto.setParentId(Joiner.on(",").join(newIds));
                }
                if (StringUtils.isNotEmpty(childIdStr)) {
                    String[] tmpIds = childIdStr.split(",");
                    List<String> newIds = new ArrayList<>();
                    for (String id : tmpIds) {
                        if (relationMap.containsKey(Long.parseLong(id))) {
                            newIds.add(relationMap.get(Long.parseLong(id)).toString());
                        }
                    }
                    dto.setChildId(Joiner.on(",").join(newIds));
                }
                nodes.add(dto);
            }
            if (projectCopyFlag) {
                //project 整体复制不需要 更该节点位置
                this.batchPositionCalculateForCopy(nodes, existTasks);
            }
            // 更新
            for (TaskDTO task : nodes) {
                try {
                    JSONObject jsonConf = task.view().getData();
                    int type = task.getType();
                    int algType =
                            jsonConf.containsKey("algType") ? jsonConf.getInteger("algType") : 0;
                    //copy node inside a project
                    if (projectCopyFlag && TaskTypeEnum.TASK_TYPE_JLAB.getVal() == type) {
                        TaskDTO oldtaskDTO = this.queryById(reverseRelationMap.get(task.getId()));
                        jlabService.copyPaste(oldtaskDTO, taskDTO.getName());
                    }
                    if (TaskTypeEnum.TASK_TYPE_ALGO.getVal() == type) {
                        //复制通用算子|系统算子
                        Long oldTaskId = reverseRelationMap.get(task.getId());
                        JSONObject oldJson = JSONObject.parseObject(task.getDataJson());
                        oldJson = clearLastRunningResult(oldJson);
                        oldJson.put("output", new JSONArray());
                        oldJson.put("parentType", new JSONArray());
                        //重新拼接out_table_rename
                        String oldTableName = oldJson.getString("out_table_rename");
                        if (StringUtils.isNotEmpty(oldTableName)) {
                            String[] parts = oldTableName.split("_");
                            parts[parts.length - 1] = task.getId().toString();
                            String newTable = Joiner.on("_").join(parts) + "_";
                            oldJson.put("out_table_rename", newTable);
                            task.setDataJson(oldJson.toJSONString());
                        }
                        //这里没有修改 model_table_rename 我知道，我改了就各种错
                    }
                    if (TaskTypeEnum.TASK_TYPE_ETL.getVal() == type) {
                        if ((ETLEnum.JOIN.getVal() == algType || ETLEnum.UNION.getVal() == algType) &&
                                StringUtils.isEmpty(task.getParentId())) {
                            if (jsonConf.containsKey("autoJoinRecommendation")) {
                                jsonConf.remove("autoJoinRecommendation");
                            }
                            if (jsonConf.containsKey("autoUnionRecommendation")) {
                                jsonConf.remove("autoUnionRecommendation");
                            }
                            jsonConf.remove("autoJoin");
                            jsonConf.put("conditions", new JSONArray());
                            jsonConf.put("input", new JSONArray());
                            task.setDataJson(jsonConf.toJSONString());
                        }
                    }
                    if (TaskTypeEnum.TASK_TYPE_CLEAN.getVal().equals(task.getType())) {
                        //查出旧节点的清洗历史记录
                        List<TaskInstanceDTO> actions = tColumnService
                                .queryByTaskIdAndOrder(reverseRelationMap.get(task.getId()));
                        if (actions.size() == 0) {
                            continue;
                        }
                        JSONObject actionData = JSONObject.parseObject(actions.get(0).getDataJson());
                        for (TaskInstanceDTO action : actions) {
                            action.setParentId(String.valueOf(task.getId()));
                            action.setTaskId(task.getId());
                            action.setPipelineId(task.getId());
                            action.setId(null);
                        }
                        tColumnService
                                .rebuildActions(actions, task.getId(), actionData.getString("table"), !projectCopyFlag);

                        //清空输入
                        JSONObject currJson = JSONObject.parseObject(task.getDataJson());
                        if (projectCopyFlag) {
                            //项目整体复制时，不请空
                            currJson.put("input", new JSONArray());
                            currJson.put("parentType", new JSONArray());
                        }
                        task.setDataJson(currJson.toJSONString());
                    }

                    this.update(task);
                } catch (Exception e) {
                    logger.error("batch copy task failed, error occured when copy task id:{}, old id:{}, since = {}",
                            task.getId(), reverseRelationMap.get(task.getId()), e.getMessage());
                    this.clearTaskUtil(assignedPipelineId);
                    return false;
                }
            }
        } else {
            // redo和undo，批量增加操作

            Set<Long> needUpdateForExists = new HashSet<>();

            for (TaskDTO task : tasks) {
                Long key = task.getId();
                String parentIds = task.getParentId();
                String childIds = task.getChildId();
                boolean isNeedUpdate = false;
                TaskDTO dto = new TaskDTO();
                dto.setId(key);
                if (StringUtils.isNotEmpty(parentIds)) {
                    String[] tmpIds = parentIds.split(",");
                    List<String> newIds = new ArrayList<>();
                    boolean isModify = false;
                    for (String id : tmpIds) {
                        if (relationMap.containsKey(Long.parseLong(id))) {
                            isModify = true;
                            newIds.add(relationMap.get(Long.parseLong(id)).toString());
                        } else {
                            newIds.add(id);
                        }
                    }
                    if (isModify) {
                        dto.setParentId(Joiner.on(",").join(newIds));
                        isNeedUpdate = true;
                    }
                    for (String id : newIds) {
                        TaskDTO parentDto = utilMap.getOrDefault(Long.parseLong(id), null);
                        if (parentDto != null) {
                            needUpdateForExists.add(Long.parseLong(id));
                            parentDto
                                    .setChildId(this.appendIds(parentDto.getChildId(), key.toString()));
                        }
                    }
                }
                if (StringUtils.isNotEmpty(childIds)) {
                    String[] tmpIds = childIds.split(",");
                    List<String> newIds = new ArrayList<>();
                    boolean isModify = false;
                    for (String id : tmpIds) {
                        if (relationMap.containsKey(Long.parseLong(id))) {
                            isModify = true;
                            newIds.add(relationMap.get(Long.parseLong(id)).toString());
                        } else {
                            newIds.add(id);
                        }
                    }
                    if (isModify) {
                        dto.setChildId(Joiner.on(",").join(newIds));
                        isNeedUpdate = true;
                    }
                    for (String id : newIds) {
                        TaskDTO childDto = utilMap.getOrDefault(Long.parseLong(id), null);
                        if (childDto != null) {
                            needUpdateForExists.add(Long.parseLong(id));
                            childDto.setParentId(
                                    this.appendIds(childDto.getParentId(), key.toString()));
                        }
                    }
                }
                if (isNeedUpdate) {
                    // needUpdateRelationNodes.put(key, dto);
                    try {
                        this.update(dto);
                    } catch (Exception e) {
                        this.clearTaskUtil(assignedPipelineId);
                        return false;
                    }
                }
            }

            for (Long id : needUpdateForExists) {
                try {
                    this.update(utilMap.get(id));
                } catch (Exception e) {
                    this.clearTaskUtil(assignedPipelineId);
                    return false;
                }
            }
        }
        this.clearTaskUtil(assignedPipelineId);
        return true;
    }

    private JSONObject clearLastRunningResult(JSONObject oldJson) {
        if (oldJson.containsKey("isSample")) {
            oldJson.remove("isSample");
        }
        if (oldJson.containsKey("isSimple")) {
            oldJson.remove("isSimple");
        }
        if (oldJson.containsKey("lastStatus")) {
            oldJson.remove("lastStatus");
        }
        if (oldJson.containsKey("lastTimeStamp")) {
            oldJson.remove("lastTimeStamp");
        }
        if (oldJson.containsKey("newFeatures")) {
            oldJson.remove("newFeatures");
        }
        return oldJson;
    }

    /***
     * 从旧的自定义算子中提取用户写的代码
     * @param scriptBody
     * @return
     */
    private JSONArray extractParagraphs(String scriptBody) {
        JSONObject scriptJson = JSONObject.parseObject(scriptBody);
        List<JSONObject> paragraphs = JSONArray
                .parseArray(scriptJson.getString("paragraphs"), JSONObject.class);
        JSONArray result = new JSONArray();
        for (int i = 0; i < paragraphs.size(); i++) {
            JSONObject temp = new JSONObject();
            if (i == 0) {
                temp.put("title", String.format("operator_copy_%s", System.currentTimeMillis()));
            }
            temp.put("text", paragraphs.get(i).getString("text"));
            result.add(temp);
        }
        return result;
    }

    public boolean batchCopy(List<TaskDTO> tasks, Long pipelineId) {
        return batchProduce(tasks, pipelineId,true);
    }

    /**
     * 批量增加，支持redo和undo操作
     *
     * @param tasks
     * @return
     */
    public boolean batchAdd(List<TaskDTO> tasks) {
        return batchProduce(tasks, false);
    }


    /**
     * 批量删除节点
     *
     * @param ids          要删除的task的id
     * @param deletedTasks
     */
    @Transactional(rollbackFor = Exception.class)
    public void batchDelete(JSONObject context, List<Long> ids, List<TaskDTO> deletedTasks,
                            Long pipelineId) {
        Set<Long> set = new HashSet<>();
        set.addAll(ids);
        for (Long id : ids) {
            TaskDTO dto = this.queryById(id);
            if (dto == null) {
                continue;
            }
            deletedTasks.add(dto);
            //更新(多个)父节点
            String parentIds = dto.getParentId();
            if (StringUtils.isNotEmpty(parentIds)) {
                String[] parIds = parentIds.split(",");
                for (String parId : parIds) {
                    if (set.contains(Long.parseLong(parId))) {
                        continue;
                    }
                    TaskDTO parentNode = this.queryById(Long.parseLong(parId));
                    if (null != parentNode) {
                        String childIdsModify = this
                                .removeIds(parentNode.getChildId(), id.toString());
                        TaskDTO newNode = new TaskDTO();
                        newNode.setId(parentNode.getId());
                        newNode.setChildId(childIdsModify);
                        this.update(newNode);
                    }
                }
            }

            //更新(多个)子节点
            String childIds = dto.getChildId();
            if (StringUtils.isNotEmpty(childIds)) {
                String[] childStrIds = childIds.split(",");
                for (String chId : childStrIds) {
                    if (set.contains(Long.parseLong(chId))) {
                        continue;
                    }
                    TaskDTO childNode = this.queryById(Long.parseLong(chId));
                    if (null == childNode) {
                        continue;
                    }
                    this.clearInOutput(childNode, dto, Boolean.FALSE);
                    this.update(childNode);
                }
            }
            //如果是图构建节点，删除图记录
            if (TaskTypeEnum.TASK_TYPE_GRAPH.getVal().equals(dto.getType())) {
                graphService.deleteByTaskId(id);
            }
            deleteLineInitContext(context, id);
            this.dropGenerateTables(dto);
            this.delete(id);
        }
        widgetService.deleteByTasks(ids);//删除节点的widget
        this.clearTaskUtil(pipelineId);
    }

    /**
     * 寻找包括自己在内的后继节点或者前继节点
     *
     * @param id
     * @param resultNodes
     * @param excludeIds
     * @param isDescendant
     * @return
     */
    public boolean findDescendantsOrAncestors(Long id, List<TaskDTO> resultNodes,
                                              Set<Long> excludeIds, boolean isDescendant) {
        Queue<Long> ids = new ArrayDeque<>();
        ids.add(id);
        while (!ids.isEmpty()) {
            Long tmpId = ids.poll();
            TaskDTO task = this.queryById(tmpId);
            if (task == null) {
                continue;
            }
            String strIds = "";
            if (isDescendant) {
                strIds = task.getChildId();
            } else {
                strIds = task.getParentId();
            }
            if (StringUtils.isNotEmpty(strIds)) {
                String[] tmps = strIds.split(",");
                for (String t : tmps) {
                    ids.add(Long.parseLong(t));
                }
            }
            if (excludeIds != null && excludeIds.contains(tmpId)) {
                continue;
            }
            resultNodes.add(task.cloneNew());
        }
        return true;
    }

    public void deleteLineInitContext(JSONObject context, Long taskId) {
        TaskDTO taskDTO = this.queryById(taskId);
        if (TaskTypeEnum.TASK_TYPE_CLEAN.getVal().equals(taskDTO.getType())) {
            List<TaskInstanceDTO> actions = taskInstanceService.queryByTaskIdAndOrder(taskId);
            JSONObject contextActions = context.getJSONObject("actions");
            if (CollectionUtil.isNotEmpty(contextActions)) {
                contextActions.put(String.valueOf(taskId), actions);
            } else {
                JSONObject obj = new JSONObject();
                obj.put(String.valueOf(taskId), actions);
                context.put("actions", obj);
            }
            if (CollectionUtil.isNotEmpty(actions)) {
                context.put("widgets", extractWidgets(taskId, actions.get(0).getId()));
            }
            //保存表信息，以待删除
            if (actions.size() > 1) {
                Long id = actions.get(1).getId();
                TaskInstanceDTO taskInstanceDTO = taskInstanceService.queryById(id);
                String table = JSONObject.parseObject(taskInstanceDTO.getDataJson())
                        .getString("table");
                JSONArray array = context.getJSONArray("views");
                if (CollectionUtil.isNotEmpty(array)) {
                    array.add(table);
                } else {
                    JSONArray views = new JSONArray();
                    views.add(table);
                    context.put("views", views);
                }
            }
        } else if (TaskTypeEnum.TASK_TYPE_ETL.getVal() == taskDTO.getType() ||
                TaskTypeEnum.TASK_TYPE_ALGO.getVal() == taskDTO.getType() ) {
            //保存节点下的widget
            List<WidgetDTO> widgets = widgetService.queryByTaskIdAndType(taskId);
            JSONArray ctxWidgets = context.getJSONArray("widgets");
            if (CollectionUtil.isEmpty(ctxWidgets)) {
                context.put("widgets", JSONArray.toJSON(widgets));
            } else {
                ctxWidgets.addAll(widgets);
            }
            //保存节点下的表、视图
            //查出所有task_instance
            List<TaskInstanceDTO> taskInstances = taskInstanceService
                    .querySuccessInstanceByTaskId(taskId);
            if (ObjectUtil.isEmpty(taskInstances)) {
                return;
            }
            //提取出表名
            List<String> tables = new ArrayList<>();
            boolean isView = PipelineUtil.extractTableName(taskInstances, tables);
            if (ObjectUtil.isNotEmpty(tables)) {
                String key = isView ? "views" : "tables";
                JSONArray array = context.getJSONArray(key);
                if (CollectionUtil.isNotEmpty(array)) {
                    array.addAll(tables);
                } else {
                    context.put(key, JSONArray.toJSON(tables));
                }
            }
        }
    }

    private List<WidgetDTO> extractWidgets(Long taskId, Long actionId) {
        List<WidgetDTO> list = Lists.newArrayList();
        List<WidgetDTO> widgetDTOS = widgetService.queryTemplateByTaskId(taskId);
        if (CollectionUtil.isNotEmpty(widgetDTOS)) {
            for (WidgetDTO widget : widgetDTOS) {
                JSONObject widgetData = JSONObject.parseObject(widget.getDataJson());
                JSONObject formData = widgetData.getJSONObject("formData");
                if (null != formData) {
                    if (ColumnConstant.TYPE_CLEAN.equals(formData.getString("dataType")) &&
                            formData.getLong("dataId") >= actionId) {
                        list.add(widget);
                    }
                }
            }
        }
        return list;
    }

    @Transactional(rollbackFor = Exception.class)
    public boolean setAutoJoinFlag(Long taskId) {
        TaskDTO etlNode = this.queryById(taskId);
//        int type = etlNode.getType();
        JSONObject conf = etlNode.view().getData();
//        int algType = conf.getInteger("algType");
//        if (type == TaskTypeEnum.TASK_TYPE_ETL.getVal() && ETLEnum.JOIN.getVal() == algType) {
//            conf.put("autoJoin", "yes");
//            etlNode.setDataJson(conf.toJSONString());
//            this.update(etlNode);
//        }
        conf.put("autoJoin", "yes");
        etlNode.setDataJson(conf.toJSONString());
        this.update(etlNode);
        return true;
    }

    public boolean triggerAutoJoin(Long taskId) {
        TaskDTO etlNode = this.queryById(taskId);
        int type = etlNode.getType();
        JSONObject conf = etlNode.view().getData();
        int algType = conf.getInteger("algType");
        if (type == TaskTypeEnum.TASK_TYPE_ETL.getVal() &&
                (ETLEnum.JOIN.getVal() == algType || ETLEnum.UNION.getVal() == algType)) {
            JSONObject dataJson = etlNode.view().getData();
            List<String> parentIds = Arrays.asList(etlNode.getParentId().split(","));
            conf = autoJoinService.autoJoinRecommend(dataJson, parentIds);
            JSONArray conditions = new JSONArray();
            JSONArray recommendRules = null;
            if (ETLEnum.JOIN.getVal() == conf.getIntValue("algType")) {
                recommendRules = conf.getJSONArray("autoJoinRecommendation");
            } else if (ETLEnum.UNION.getVal() == conf.getIntValue("algType")) {
                recommendRules = conf.getJSONArray("autoUnionRecommendation");
            }
            int index = 0;
            JSONObject rule = recommendRules.getJSONObject(index);
            double score = rule.getDouble("score");
            if (ETLEnum.JOIN.getVal() == conf.getIntValue("algType") && score >= Join.getMinRecScore()) {
                conditions.add(SerializationUtils.clone(rule));
            } else if (ETLEnum.UNION.getVal() == conf.getIntValue("algType") && score >= Union.getMinRecScore()) {
                conditions.add(SerializationUtils.clone(rule));
            }
            conf.put("conditions", conditions);
            etlNode.setDataJson(JSONObject.toJSONString(conf));
            this.update(etlNode);
        }
        return true;
    }

    public JSONObject queryJoinResult(TaskVO vo) {
        JSONObject ret = new JSONObject();

        Long taskId = vo.getId();
        TaskDTO task = queryById(taskId);
        JSONObject conf = JSONObject.parseObject(task.getDataJson());
        if (conf.getJSONObject("joinResult") != null) {
            ret = conf.getJSONObject("joinResult");
            return ret;
        }

        long timeStamp = System.currentTimeMillis();
        String viewTemplate = String
                .format(SqlTemplate.VIEW_TABLE_NAME, vo.getPipelineId(), vo.getId());

        if (StringUtils.isEmpty(task.getParentId())) {
            logger.warn("parentId is empty!!!");
            return ret;
        }
        List<String> parentIds = Arrays.asList(task.getParentId().split(","));

        // List<String> tableNames = taskInstanceService.getTableName(input, parentIds);
        List<String> tableNames = new ArrayList<>();
        TaskVO taskVO = this.queryFullInputById(vo.getId());
        JSONArray input = taskVO.getData().getJSONArray("input");
        if (input == null || input.size() < 2) {
            vo.setException(new IndexOutBoundException("输入表少于两个，请重新连接join节点"));
            return null;
        }
        for (int i = 0; i < input.size(); ++i) {
            tableNames.add(input.getJSONObject(i).getString("tableName"));
        }

        String tableLeft = tableNames.get(0);
        String tableRight = tableNames.get(1);

        String template = "from %s a full outer join %s b on ";
        JSONObject voData = vo.getData();
        JSONArray conditions = voData.getJSONArray("conditions");
        for (int i = 0; i < conditions.size(); i++) {
            if (i > 0) {
                template += " and ";
            }
            JSONObject condition = conditions.getJSONObject(i);
            String left = condition.getString("leftHeaderName");
            String right = condition.getString("rightHeaderName");
            String operator = condition.getString("operator");
            template += String.format("a.%s %s b.%s", left, operator, right);
        }

        List<String> leftFields = input.getJSONObject(0).getJSONArray("tableCols")
                .toJavaList(String.class);
        List<String> rightFields = input.getJSONObject(1).getJSONArray("tableCols")
                .toJavaList(String.class);
        List<String> leftFieldKeys = SqlTemplate.getColumns(leftFields);
        List<String> rightFieldKeys = SqlTemplate.getColumns(rightFields);
        List<String> renameLeftFields = SqlTemplate
                .columnsRenameSql(leftFields, "a", null, rightFieldKeys);
        List<String> renameRightFields = SqlTemplate
                .columnsRenameSql(rightFields, "b", null, leftFieldKeys);

        String selectLeftColumns = Joiner.on(", ").join(renameLeftFields);
        String selectRightColumns = Joiner.on(", ").join(renameRightFields);

        String outerJoinSql = String
                .format("select row_number() over() as _record_id_, %s, %s " + template,
                        selectLeftColumns,
                        selectRightColumns, tableLeft, tableRight);
        String outerJoinView = viewTemplate + "outer_join_" + timeStamp;

        renameLeftFields = SqlTemplate.columnsRename(leftFields, "a", null, rightFieldKeys);
        renameRightFields = SqlTemplate.columnsRename(rightFields, "b", null, leftFieldKeys);
        List<String> leftNotNullCols = new ArrayList<>();
        for (String left : renameLeftFields) {
            leftNotNullCols.add(left + " is not null");
        }
        List<String> rightNotNullCols = new ArrayList<>();
        for (String right : renameRightFields) {
            rightNotNullCols.add(right + " is not null");
        }

        String matchTemplate = "select distinct %s from " + outerJoinView + " where %s is not null and %s";
        String notMatchTemplate = "select distinct %s from " + outerJoinView + " where %s is null and %s";
        String countTemplate = "select count(*) from %s";
        String leftMatchSql = String.format(matchTemplate, Joiner.on(", ").join(renameLeftFields), "_record_id__b", Joiner.on(" and ").join(leftNotNullCols));
        String leftNotMatchSql = String
                .format(notMatchTemplate, Joiner.on(", ").join(renameLeftFields), "_record_id__b", Joiner.on(" and ").join(leftNotNullCols));
        String rightMatchSql = String.format(matchTemplate, Joiner.on(", ").join(renameRightFields), "_record_id__a", Joiner.on(" and ").join(rightNotNullCols));
        String rightNotMatchSql = String
                .format(notMatchTemplate, Joiner.on(", ").join(renameRightFields), "_record_id__a", Joiner.on(" and ").join(rightNotNullCols));

        Connection conn = null;
        try {
            conn = gpDataProvider.getConn(1L);
            Statement st = conn.createStatement();

            String sql = String.format(SqlTemplate.CREATE_VIEW_SQL, outerJoinView, outerJoinSql);
            st.execute(sql);

            String leftMatchView = viewTemplate + "left_match_" + timeStamp;
            sql = String.format(SqlTemplate.CREATE_VIEW_SQL, leftMatchView, leftMatchSql);
            st.execute(sql);

            String leftNotMatchView = viewTemplate + "left_not_match_" + timeStamp;
            sql = String.format(SqlTemplate.CREATE_VIEW_SQL, leftNotMatchView, leftNotMatchSql);
            st.execute(sql);

            String rightMatchView = viewTemplate + "right_match_" + timeStamp;
            sql = String.format(SqlTemplate.CREATE_VIEW_SQL, rightMatchView, rightMatchSql);
            st.execute(sql);

            String rightNotMatchView = viewTemplate + "right_not_match_" + timeStamp;
            sql = String.format(SqlTemplate.CREATE_VIEW_SQL, rightNotMatchView, rightNotMatchSql);
            st.execute(sql);

            String countLeftMatchSql = String.format(countTemplate, leftMatchView);
            ResultSet rs = st.executeQuery(countLeftMatchSql);
            rs.next();
            long leftMatch = rs.getLong("count");

            String countRightMatchSql = String.format(countTemplate, rightMatchView);
            rs = st.executeQuery(countRightMatchSql);
            rs.next();
            long rightMatch = rs.getLong("count");

            String countLeftNotMatchSql = String.format(countTemplate, leftNotMatchView);
            rs = st.executeQuery(countLeftNotMatchSql);
            rs.next();
            long leftNotMatch = rs.getLong("count");

            String countRightNotMatchSql = String.format(countTemplate, rightNotMatchView);
            rs = st.executeQuery(countRightNotMatchSql);
            rs.next();
            long rightNotMatch = rs.getLong("count");

            ret.put("leftMatch", leftMatch);
            ret.put("rightMatch", rightMatch);
            ret.put("leftNotMatch", leftNotMatch);
            ret.put("rightNotMatch", rightNotMatch);
            ret.put("leftMatchView", leftMatchView);
            ret.put("leftNotMatchView", leftNotMatchView);
            ret.put("rightMatchView", rightMatchView);
            ret.put("rightNotMatchView", rightNotMatchView);
        } catch (Exception e) {
            logger.error("TaskService queryJoinResult error, errMsg={}", e.getMessage());
        } finally {
            JDBCUtil.close(conn, null, null);
        }
        return ret;
    }

    private String getFullTableName(String nameTemplate) {
        String tableName = nameTemplate;
        if (!tableName.endsWith("_")) {
            return tableName;
        }
        ColumnQueryVO columnQueryVO = new ColumnQueryVO();
        columnQueryVO.setTable(tableName);
        String[] tmps = tableName.split("_");
        if (tmps.length < 3) {
            return tableName;
        }
        Long taskId;
        try {
            taskId = Long.parseLong(tmps[tmps.length - 1]);
            columnQueryVO.setTaskId(taskId);
            if (!tColumnService.checkTableIfExits(columnQueryVO)) {
                return tableName;
            }
            tableName = columnQueryVO.getTable();
        } catch (Exception e) {
            return tableName;
        }
        return tableName;
    }

    public void setHeaderForResponse(HttpServletResponse response, String name) {
        response.setContentType("application/download;charset=UTF-8");
        response.setContentType("Content-type:application/vnd.ms-excel;charset=UTF-8");
        try {
            name = URLEncoder.encode(name, "UTF-8");
        } catch (Exception e) {
            logger.error(e.getMessage());
        }
        response
                .setHeader("Content-disposition", "attachment;filename=\"download_" + name + ".csv\"");
    }

    public boolean downloadTable(HttpServletResponse response, Long taskId) {
        TaskDTO taskDTO = queryById(taskId);
        List<String> tables = new ArrayList<>();
        int type = taskDTO.getType();
        if (type == TaskTypeEnum.TASK_TYPE_CLEAN.getVal() || type == TaskTypeEnum.TASK_TYPE_DATA
                .getVal()) {
            JSONObject jsonObject = taskDTO.view().getData();
            JSONArray output = jsonObject.getJSONArray("output");
            if (output.size() == 0) {
                return false;
            }
            JSONObject item = output.getJSONObject(0);
            String tableName = item.getString("tableName");
            tables.add(tableName);
        } else if (type == TaskTypeEnum.TASK_TYPE_MODEL.getVal()
                || type == TaskTypeEnum.TASK_TYPE_ALGOPY.getVal()) {
            try {
                String dataJson = taskDTO.getDataJson();
                JSONObject data = JSONObject.parseObject(dataJson);
                JSONArray outputarray = data.getJSONArray("output");
                JSONObject output = outputarray.getJSONObject(0);
                String otableName = output.getString("tableName");
                if (!otableName.contains(".")) {
                    otableName = "dataset." + otableName;
                }
                tables.add(otableName);
            } catch (Exception e) {
                logger.info("unable to set table name");
                return false;
            }
        } else {
            TaskInstanceDTO taskInstanceDTO = taskInstanceService
                    .queryLatestInstanceForTask(taskId);
            if (taskInstanceDTO == null) {
                throw new RuntimeException("数据下载错误, 节点执行失败或找不到最近一次执行成功记录!");
            }
            PipelineUtil
                    .extractTableName(com.google.common.collect.Lists.newArrayList(taskInstanceDTO),
                            tables);
            if (tables.size() == 0) {
                logger.error("table is empty!!!");
                return false;
            }
        }
        String name =
                StringUtils.isNotEmpty(taskDTO.getName()) ? taskDTO.getName() : taskId.toString();
        return downloadData(response, tables, name);
    }

    public boolean downloadData(HttpServletResponse response, List<String> tables, String name) {
//    TaskInstanceDTO taskInstanceDTO = taskInstanceService.queryLatestInstanceForTask(taskId);
//    if (taskInstanceDTO == null) {
//      throw new RuntimeException("数据下载错误, 节点执行失败或找不到最近一次执行成功记录!");
//    }
//    List<String> tables = new ArrayList<>();
//    PipelineUtil.extractTableName(com.google.common.collect.Lists.newArrayList(taskInstanceDTO), tables);
//    if (tables.size() == 0) {
//      logger.error("table is empty!!!");
//      return false;
//    }
        String table = tables.get(0);
        String schema = "dataset";
        if (table.contains(".")) {
            String[] tmps = table.split("[.]");
            table = tmps[1];
            schema = tmps[0];
        }
        Map<String, String> meta = gpDataProvider
                .getTableMetaMap(table.replaceAll("\"", ""), schema);

        List<String> cols = new ArrayList<>();
        cols.addAll(meta.keySet());
        if (cols.contains("_record_id_")) {
            cols.remove("_record_id_");
        }

        JSONArray data = gpDataProvider
                .getPreviewData(ToolUtil.alignTableName(tables.get(0), 0L), cols, -1);

        this.setHeaderForResponse(response, name);

        StringBuffer sb = new StringBuffer();
        sb.append(Joiner.on(",").join(SqlUtil.formatPGSqlCols(cols))).append("\n");
        for (int i = 0; i < data.size(); ++i) {
            JSONObject item = data.getJSONObject(i);
            List<String> values = new ArrayList<>();
            for (String col : cols) {
                values.add(item.getString(col) == null ? ""
                        : SqlUtil.formatPGSqlColName(item.getString(col)));
            }
            sb.append(Joiner.on(",").join(values)).append("\n");
        }

        OutputStream outputStream = null;
        try {
            outputStream = response.getOutputStream();
            outputStream.write(new byte[]{(byte) 0xEF, (byte) 0xBB, (byte) 0xBF}); //加上bom头，才不会中文乱码
            outputStream.write(sb.toString().getBytes(StandardCharsets.UTF_8));

        } catch (IOException e) {
            e.printStackTrace();
            return false;
        }
        return true;
    }

    /**
     * 补充完整的input表信息
     *
     * @param id
     * @return
     */
    public TaskVO queryFullInputById(Long id) {
        TaskDTO taskDTO = this.queryById(id);
        TaskVO vo = taskDTO.view();
        JSONObject data = vo.getData();
        JSONArray input = new JSONArray();
        if (data.containsKey("input")) {
            input = data.getJSONArray("input");
        } else {
            data.put("input", input);
            vo.setData(data);
        }
        for (int i = 0; i < input.size(); ++i) {
            JSONObject item = input.getJSONObject(i);
            String nameTemplate = item.getString("tableName");
            String tableName = ToolUtil.alignTableName(this.getFullTableName(nameTemplate), 0L);
            item.put("tableName", tableName);
            input.set(i, item);
        }
        return vo;
    }

    public Boolean saveToDataset(TaskSaveVO vo) {
        long userId = JwtUtil.getCurrentUserId();
        String tbName = "";
        if (vo.getTableNameML() == null) {
            TaskDTO task = taskMapper.queryById(vo.getTaskId());
            if (task == null) {
                throw new DataScienceException(BaseErrorCode.TASK_NOT_EXIST);
            }
            if (task.getType() == TaskTypeEnum.TASK_TYPE_MODEL.getVal()
                    || task.getType() == TaskTypeEnum.TASK_TYPE_ALGOPY.getVal()
                    || task.getType() == TaskTypeEnum.TASK_TYPE_JLAB.getVal()) {
                try {
                    String dataJson = task.getDataJson();
                    JSONObject data = JSONObject.parseObject(dataJson);
                    JSONArray outputarray = data.getJSONArray("output");
                    JSONObject output = outputarray.getJSONObject(0);
                    String otableName = output.getString("tableName");
                    if (!otableName.contains(".")) {
                        otableName = "dataset." + otableName;
                    }
                    tbName = otableName;
                } catch (Exception e) {
                    logger.info("unable to set table name");
                }
            } else {
                List<String> tables = getTaskInstanceTable(task);
                if (tables.isEmpty()) {
                    throw new DataScienceException(BaseErrorCode.TASK_OUTPUT_TABLE_NOT_EXIST);
                }
                tbName = ToolUtil.alignTableName(tables.get(0), 0L);
            }
        } else {
            //将tbName设置为mlmodel表
            tbName = "ml_model." + vo.getTableNameML();
        }
        String schema = "dataset";
        String newTable = ImportDataService.generateGpTableName() + "_p";
        String newName = schema + "." + newTable;
        String viewToTablesql = "create table " + newName + " as select * from " + tbName + ";";
        String totalRowSql = "select count(*) from " + tbName + ";";
        Connection conn = null;
        String oldTable = "";

        DatasetJsonInfo dji = new DatasetJsonInfo();
        dji.setSchema(schema);
        dji.setType("pipeline");
        dji.setTable(newTable);
        //task id存入source，记录数据来源，和type字段配合，可实现定位功能
        dji.setSource(String.valueOf(vo.getTaskId()));

        Long categoryId = vo.getCategoryId();
        if (categoryId == null) {
            if (StringUtils.isNotBlank(vo.getCategoryName())) {
                DatasetCategoryDTO dcDTO = new DatasetCategoryDTO();
                dcDTO.setName(vo.getCategoryName());
                dcDTO.setUserId(JwtUtil.getCurrentUserId());
                dcDTO.setGmtCreator(JwtUtil.getCurrentUserId());
                dcDTO.setGmtModifier(JwtUtil.getCurrentUserId());
                categoryId = datasetCategoryService.save(dcDTO);
            } else {
                throw new DataScienceException(BaseErrorCode.TASK_NOT_EXIST);
            }
        }

        DatasetDTO dataset = datasetService
                .queryByCategoryIdAndDatasetName(categoryId, vo.getDatasetName());
        boolean flag = false;
        if (dataset != null) {
            //数据集源表名不变
            if (StringUtils.isNotBlank(dataset.getDataJson())) {
                JSONObject dj = JSONObject.parseObject(dataset.getDataJson());
                oldTable = dj.getString("table");
                if (StringUtils.isNotBlank(oldTable)) {
                    dji.setTable(oldTable);
                }
            }

            //更新数据集配置信息
            dataset.setDataJson(JSON.toJSONString(dji));
            flag = datasetService.update(dataset);
        } else {
            //创建一个新的数据集
            long res = datasetService
                    .insert(datasetService.buildDataset(userId, categoryId, vo.getDatasetName(), dji));
            if (res > 0) {
                flag = true;
            }
        }

        if (flag) {
            try {
                conn = gpDataProvider.getConn(1L);
                Statement stmt = conn.createStatement();
                //视图转表
                stmt.executeUpdate(viewToTablesql);

                if (StringUtils.isNotBlank(oldTable)) {
                    //删除旧表及其依赖
                    String dropSql = String
                            .format("drop table if exists %s CASCADE;", schema + "." + oldTable);
                    logger.info(dropSql);
                    stmt.executeUpdate(dropSql);
                    //新表名重命名为旧表名
                    String renameSql = String
                            .format("alter table %s rename to %s;", schema + "." + newTable, oldTable);
                    logger.info(renameSql);
                    stmt.executeUpdate(renameSql);
                }
                return true;
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                JDBCUtil.close(conn, null, null);
            }
        }

        return false;
    }

    /**
     * 获取task instance表名（参考下载功能）
     *
     * @param taskDTO
     * @return
     */
    private List<String> getTaskInstanceTable(TaskDTO taskDTO) {
        List<String> tables = new ArrayList<>();
        int type = taskDTO.getType();
        if (type == TaskTypeEnum.TASK_TYPE_CLEAN.getVal() || type == TaskTypeEnum.TASK_TYPE_DATA
                .getVal()) {
            JSONObject jsonObject = taskDTO.view().getData();
            JSONArray output = jsonObject.getJSONArray("output");
            if (output.size() == 0) {
                return tables;
            }
            JSONObject item = output.getJSONObject(0);
            String tableName = item.getString("tableName");
            tables.add(tableName);
        } else {
            TaskInstanceDTO taskInstanceDTO = taskInstanceService
                    .queryLatestInstanceForTask(taskDTO.getId());
            if (taskInstanceDTO == null) {
                throw new RuntimeException("数据下载错误, 节点执行失败或找不到最近一次执行成功记录!");
            }
            PipelineUtil
                    .extractTableName(com.google.common.collect.Lists.newArrayList(taskInstanceDTO),
                            tables);
            if (tables.size() == 0) {
                logger.error("table is empty!!!");
            }
        }
        return tables;
    }

    public Set<Long> locateDataSource(Long datasetId, Long pipelineId) {
        Set<Long> taskIds = new HashSet<>();
        DatasetDTO dataset = datasetService.queryById(datasetId);
        if (dataset != null) {
            JSONObject data = JSONObject.parseObject(dataset.getDataJson());
            String type = data.getString("type");
            String tableName = data.getString("table");
            if ("pipeline".equals(type) && data.containsKey("source")) {
                String source = data.getString("source");
                try {
                    taskIds.add(Long.valueOf(source));
                } catch (Exception e) {
                }
            }
            if (StringUtils.isNotBlank(tableName)) {
                List<Long> res = taskMapper.queryByTableNameAndPipelineId(tableName, pipelineId);
                if (res != null) {
                    for (Long id : res) {
                        taskIds.add(id);
                    }
                }
            }
        }

        return taskIds;
    }

    public List<Long> queryIdByPepelineId(long projectId) {
        return taskMapper.queryIdByPipeline(projectId);
    }

    public boolean batchSave(List<TaskDTO> tasks) {
        int res = taskMapper.batchSave(tasks);
        return res > 0 ? true : false;
    }

    public Boolean saveToDataSource(TaskSaveVO vo) {
        long userId = JwtUtil.getCurrentUserId();
        String tbName = "";
        if (vo.getTableNameML() == null) {
            TaskDTO task = taskMapper.queryById(vo.getTaskId());
            if (task == null) {
                throw new DataScienceException(BaseErrorCode.TASK_NOT_EXIST);
            }
            if (task.getType() == TaskTypeEnum.TASK_TYPE_MODEL.getVal()
                    || task.getType() == TaskTypeEnum.TASK_TYPE_ALGOPY.getVal()
                    || task.getType() == TaskTypeEnum.TASK_TYPE_JLAB.getVal()) {
                try {
                    String dataJson = task.getDataJson();
                    JSONObject data = JSONObject.parseObject(dataJson);
                    JSONArray outputarray = data.getJSONArray("output");
                    JSONObject output = outputarray.getJSONObject(0);
                    String otableName = output.getString("tableName");
                    if (!otableName.contains(".")) {
                        otableName = "dataset." + otableName;
                    }
                    tbName = otableName;
                } catch (Exception e) {
                    logger.info("unable to set table name");
                }
            } else {
                List<String> tables = getTaskInstanceTable(task);
                if (tables.isEmpty()) {
                    throw new DataScienceException(BaseErrorCode.TASK_OUTPUT_TABLE_NOT_EXIST);
                }
                tbName = ToolUtil.alignTableName(tables.get(0), 0L);
            }
        } else {
            //将tbName设置为mlmodel表
            tbName = "ml_model." + vo.getTableNameML();
        }

        String schema = "dataset";
        String newTable = ImportDataService.generateGpTableName() + "_p";
        String newName = schema + "." + newTable;
        String viewToTablesql = "create table " + newName + " as select * from " + tbName + ";";
        String totalRowSql = "select count(*) from " + tbName + ";";
        Connection conn = null;
        String oldTable = "";

        DatasetJsonInfo dji = new DatasetJsonInfo();
        dji.setSchema(schema);
        dji.setType("pipeline");
        dji.setTable(newTable);
        //task id存入source，记录数据来源，和type字段配合，可实现定位功能
        dji.setSource(String.valueOf(vo.getTaskId()));

        Long categoryId = vo.getCategoryId();
        if (categoryId == null) {
            if (StringUtils.isNotBlank(vo.getCategoryName())) {
                DatasetCategoryDTO dcDTO = new DatasetCategoryDTO();
                dcDTO.setName(vo.getCategoryName());
                dcDTO.setUserId(JwtUtil.getCurrentUserId());
                dcDTO.setGmtCreator(JwtUtil.getCurrentUserId());
                dcDTO.setGmtModifier(JwtUtil.getCurrentUserId());
                categoryId = datasetCategoryService.save(dcDTO);
            } else {
                throw new DataScienceException(BaseErrorCode.TASK_NOT_EXIST);
            }
        }

        DatasetDTO dataset = datasetService
                .queryByCategoryIdAndDatasetName(categoryId, vo.getDatasetName());
        boolean flag = false;
        if (dataset != null) {
            //数据集源表名不变
            if (StringUtils.isNotBlank(dataset.getDataJson())) {
                JSONObject dj = JSONObject.parseObject(dataset.getDataJson());
                oldTable = dj.getString("table");
                if (StringUtils.isNotBlank(oldTable)) {
                    dji.setTable(oldTable);
                }
            }

            //更新数据集配置信息
            dataset.setDataJson(JSON.toJSONString(dji));
            flag = datasetService.update(dataset);
        } else {
            //创建一个新的数据集
            long res = datasetService
                    .insert(datasetService.buildDataset(userId, categoryId, vo.getDatasetName(), dji));

            if (res > 0) {
                //添加一条数据到dataset_project表
                DatasetProjectDTO dpd = new DatasetProjectDTO();
                dpd.setDatasetId(res);
                dpd.setProjectId(vo.getProjectId());
                datasetProjectService.add(Arrays.asList(dpd));
                flag = true;
            }
        }

        if (flag) {
            try {
                conn = gpDataProvider.getConn(1L);
                Statement stmt = conn.createStatement();
                //视图转表
                stmt.executeUpdate(viewToTablesql);

                if (StringUtils.isNotBlank(oldTable)) {
                    //删除旧表及其依赖
                    String dropSql = String
                            .format("drop table if exists %s CASCADE;", schema + "." + oldTable);
                    logger.info(dropSql);
                    stmt.executeUpdate(dropSql);
                    //新表名重命名为旧表名
                    String renameSql = String
                            .format("alter table %s rename to %s;", schema + "." + newTable, oldTable);
                    logger.info(renameSql);
                    stmt.executeUpdate(renameSql);
                }
                return true;
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                JDBCUtil.close(conn, null, null);
            }
        }

        return false;
    }

    /**
     * 判断是否是具有相同数据列名和类型的数据源
     *
     * @param preTaskId
     * @param currentId
     * @return
     */
    public boolean isSimilarSourceTable(Long preTaskId, Long currentId) {
        boolean isSimilar = false;
        JSONObject preData = this.queryById(preTaskId).view().getData();
        JSONArray preOutput = preData.getJSONArray("output");
        if (preData == null || preOutput.size() == 0) {
            return false;
        }
        JSONObject preOutItem = preOutput.getJSONObject(0);
        List<String> preTableCols = preOutItem.getJSONArray("tableCols").toJavaList(String.class);
        List<String> preColumnTypes = preOutItem.getJSONArray("columnTypes")
                .toJavaList(String.class);

        //验证字段名和类型
        List<ColumnAction> columnActions = tColumnService.queryAction(currentId);
        for (ColumnAction action : columnActions) {
            JSONObject dataInfo = action.getData();
            String actionName = dataInfo.getString("action");
            if (actionName.equals(ActionEnum.ORIGINAL_DATA.name())) {
                if (dataInfo.containsKey("inputInfo") && dataInfo.getJSONObject("inputInfo").containsKey("input")) {
                    JSONObject inputInfo = dataInfo.getJSONObject("inputInfo").getJSONArray("input")
                            .getJSONObject(0);
                    List<String> currTableCols = inputInfo.getJSONArray("tableCols")
                            .toJavaList(String.class);
                    List<String> currColumnTypes = inputInfo.getJSONArray("columnTypes")
                            .toJavaList(String.class);
                    if (CollectionUtils.isEqualCollection(preTableCols, currTableCols) &&
                            CollectionUtils.isEqualCollection(preColumnTypes, currColumnTypes)) {
                        isSimilar = true;
                    }
                    break;
                }
            }
        }
        return isSimilar;
    }


    public boolean isSameSourceTable(Long preTaskId, Long currentId) {
        boolean isSame = false;
        List<ColumnAction> columnActions = tColumnService.queryAction(currentId);
        TaskDTO preTask = this.queryById(preTaskId);
        JSONObject data = preTask.view().getData();
        JSONArray output = data.getJSONArray("output");
        if (data == null || output.size() == 0) {
            return false;
        }
        JSONObject item = output.getJSONObject(0);
        String tableName = item.getString("tableName");
        String schema = "dataset";
        String table = "";
        String[] splits = tableName.split("\\.");
        if (splits.length > 1) {
            schema = splits[0];
            table = splits[splits.length - 1];
        } else {
            table = tableName;
        }
        try {
            Map<String, String> meta = gpDataProvider
                    .getTableMetaMap(table.replaceAll("\"", ""), schema);
            for (ColumnAction action : columnActions) {
                JSONObject dataInfo = action.getData();
                String actionName = dataInfo.getString("action");
                if (actionName.equals(ActionEnum.ORIGINAL_DATA.name())) {
                    String orgTable = dataInfo.getString("table");
                    String[] tmps = orgTable.split("\\.");
                    orgTable = tmps[tmps.length - 1];
                    if (orgTable.equals(table)) {
                        isSame = true;
                        break;
                    }
                }
            }
        } catch (Exception e) {
            return false;
        }
        return isSame;
    }

    /**
     * 部分自定义算子执行成功但是没有数据显示，需要重新查一次
     *
     * @param task
     * @return
     */
    public JSONArray queryAlgoRecentLog(TaskDTO taskDTO) {
        List<String> logs = taskInstanceService
                .queryAlgoRecentLog(taskDTO.getId(), taskDTO.getProjectId());
        JSONArray loginfos = new JSONArray();

        if (logs.size() > 0) {
            String log = logs.get(0);//the most recent log
            JSONArray rawDataList = null;
            try {
                rawDataList = JSONObject.parseObject(log).getJSONObject("result")
                        .getJSONArray("output_params");
                rawDataList.stream()
                        .forEach(params -> wrapperOutputFormat((JSONObject) params, taskDTO.view(),
                                loginfos));
                return loginfos;
            } catch (NullPointerException e) {
                logger.info(String.format(
                        "there is no output params when querying task's (taskId = %s) log in project (projectId = %s);",
                        taskDTO.getId(), taskDTO.getProjectId()));
                return new JSONArray();
            } catch (Exception e) {
                logger.info(String.format(
                        "something wrong when querying task's (taskId = %s) log in project (projectId = %s) and Exception is %s;",
                        taskDTO.getId(), taskDTO.getProjectId(), e.getMessage()));
                return new JSONArray();
            }

        } else {
            // 如果脸log都没有就是失败
            return new JSONArray();
        }
    }

    private void wrapperOutputFormat(JSONObject params, TaskVO view, JSONArray array) {
        JSONObject inputColumnInfo = JSONArray
                .parseArray(view.getData().getString("input"), JSONObject.class).get(0);
        JSONObject tmpLog = new JSONObject();
        tmpLog.put("tableCols", params.getJSONArray("output_cols"));
        tmpLog.put("totalRow", inputColumnInfo.getIntValue("totalRow"));
        tmpLog.put("semantic", inputColumnInfo.getJSONObject("semantic"));
        tmpLog.put("columnTypes", params.getJSONArray("col_types"));
        tmpLog.put("tableName", params.get("out_table_name"));
        tmpLog.put("nodeName", view.getName());
        array.add(tmpLog);
    }

    public void checkAuth(Long taskId) {
        long userId = JwtUtil.getCurrentUserId();
        if (userId == 0) {
            throw new DataScienceException(BaseErrorCode.UNAUTHORIZED);
        }

        TaskDTO task = taskMapper.queryById(taskId);

        if (task == null || JwtUtil.getCurrentUserId() != task.getUserId()) {
            throw new DataScienceException(BaseErrorCode.TASK_NO_OPERATION_PERMISSION);
        }
    }

    public JSONObject featureFilter(JSONObject childData, JSONObject parentData, String filterType, int index) {
        if (index == -1) {
            return childData;
        }
        JSONArray setParamsJsonArray = childData.getJSONArray("setParams");
        if (setParamsJsonArray == null) {
            return childData;
        }
        List<String> inputCols = parentData.getJSONArray("output").getJSONObject(0)
                .getJSONArray("tableCols").toJavaList(String.class);
        List<String> columnTypes = parentData.getJSONArray("output").getJSONObject(0)
                .getJSONArray("columnTypes").toJavaList(String.class);

        System.out.println("inputCols -> " + inputCols);
        if (childData.getJSONArray("output").size() > 0) {
            childData.getJSONArray("output").getJSONObject(0).put("tableCols", inputCols);
            childData.getJSONArray("output").getJSONObject(0).put("columnTypes", columnTypes);
        }else {
            JSONArray tempOutput = new JSONArray();
            tempOutput.add(parentData.getJSONArray("output").getJSONObject(0));
            childData.put("output", tempOutput);
        }

        List<String> columnsFilterd = ToolUtil.filterTypeAndCol(inputCols, columnTypes, filterType).getKey();
        System.out.println("columnsFilterd -> " + columnsFilterd);
        JSONObject setParams = setParamsJsonArray.getJSONObject(0);
        JSONArray colsArray = new JSONArray();
        for (String col : columnsFilterd) {
            JSONObject optionPair = new JSONObject();
            optionPair.put("label", col);
            optionPair.put("value", col);
            colsArray.add(optionPair);
        }
        setParams.getJSONArray("formItem").getJSONObject(index)
                .getJSONObject("props").put("options", colsArray);

        JSONObject formData = setParams.getJSONObject("formData");
        String colFormData = formData.getString("col");
        if (colFormData != null) {
            if (colFormData.equals("<无可选特征列>")) {
                if (columnsFilterd.size() != 0) {
                    formData.put("col", columnsFilterd.get(0));
                } else {
                    formData.put("col", "<请确保父节点有数值型特征列>");
                }
            } else {
                if (!columnsFilterd.contains(colFormData)) {
                    formData.put("col", "<请重新选择特征列>");
                }
            }
        }

        JSONArray colsFormData = formData.getJSONArray("cols");
        JSONArray newColsFormData = new JSONArray();
        if (colsFormData != null) {
            for (int i = 0; i < colsFormData.size(); i++) {
                String oneColsFormData = colsFormData.getString(i);
                if (columnsFilterd.contains(oneColsFormData)) {
                    newColsFormData.add(oneColsFormData);
                }
            }
            formData.remove("cols");
            formData.put("cols", newColsFormData);
        }

        JSONArray newsetParamsJsonArray = new JSONArray();
        newsetParamsJsonArray.add(setParams);

        childData.put("setParams", newsetParamsJsonArray);
        return childData;
    }


    public String getOutputTableName(TaskDTO taskNode) {
        JSONObject dataJson = JSONObject.parseObject(taskNode.getDataJson());
        JSONObject taskOutputData = dataJson.getJSONArray("output").getJSONObject(0);
        String table = taskOutputData.getString("tableName");
        Character lastChar = table.charAt(table.length() - 1);
        if (lastChar.equals('_')) {
            Long parentTimeStamp = taskOutputData.getLong("lastTimeStamp");
            if (null == parentTimeStamp) {
                parentTimeStamp = dataJson.getLong("lastTimeStamp");
            }
            if (null != parentTimeStamp){
                table = table + parentTimeStamp;
            }
        }
        if (!table.contains(".")) {
            table = "dataset." + table;
        }
        return table;
    }

    public String getOutputTableNameModify(TaskDTO taskNode) {
        //方法1
        String table = TaskDTOUtil.getValueByKey("output[0].tableName", taskNode, String.class);
        Character lastChar = table.charAt(table.length() - 1);
        if (lastChar.equals('_')) {
            Long parentTimeStamp = TaskDTOUtil.getValueByKey("lastTimeStamp", taskNode, Long.class);
            if (null != parentTimeStamp) {
                table = table + parentTimeStamp;
            }
        }
        if (!table.contains(".")) {
            table = "dataset." + table;
        }
        //方法2
        String table2 = this.queryJsonObjectByIdAndKey(taskNode.getId(), "output[0].tableName", String.class);
        Character lastChar2 = table2.charAt(table.length() - 1);
        if (lastChar2.equals('_')) {
            Long parentTimeStamp = this.queryJsonObjectByIdAndKey(taskNode.getId(), "lastTimeStamp", Long.class);
            if (null != parentTimeStamp) {
                table = table + parentTimeStamp;
            }
        }
        if (!table.contains(".")) {
            table = "dataset." + table;
        }
        //方法3
        String table3 = TaskUtil.extractTableStr(taskNode);

        return table;
    }

    public JSONObject rowNumLimit(JSONObject childData, TaskDTO parentNode) {
        String table = this.getOutputTableName(parentNode);

        ColumnQueryVO columnQuery = new ColumnQueryVO();
        columnQuery.setTaskId(parentNode.getId());
        columnQuery.setTable(table);
        columnQuery.setMode(2);
        JSONObject parentTableInfo = tColumnService.queryDetail(columnQuery);
        if (parentTableInfo.getJSONObject("page") != null && parentTableInfo.containsKey("page")) {
            int rowNum = parentTableInfo.getJSONObject("page").getInteger("totalElements");
            JSONArray formItem = childData.getJSONArray("setParams").getJSONObject(0).getJSONArray("formItem");
//        formItem.getJSONObject(2).getJSONObject("props").remove("max");
            int wsLimit = new Integer(rowNum / 3);
            formItem.getJSONObject(2).getJSONObject("props").put("max", rowNum);
            formItem.getJSONObject(4).getJSONObject("props").put("max", wsLimit);
        }
        //todo
//        if (rowNum <= 10){
//            childData.put("warning", ApiResultCode.SMOOTH_DATA_PRE_REQ.getMessage());
//        }
        return childData;
    }

    public JSONObject generalRowNumLimit(JSONObject childData, TaskDTO parentNode, List<Integer> indexes) {
        String table = this.getOutputTableName(parentNode);

        ColumnQueryVO columnQuery = new ColumnQueryVO();
        columnQuery.setTaskId(parentNode.getId());
        columnQuery.setTable(table);
        columnQuery.setMode(2);
        JSONObject parentTableInfo = tColumnService.queryDetail(columnQuery);
        if (parentTableInfo.getJSONObject("page") != null && parentTableInfo.containsKey("page")) {
            int rowNum = parentTableInfo.getJSONObject("page").getInteger("totalElements");
            JSONArray formItem = childData.getJSONArray("setParams").getJSONObject(0).getJSONArray("formItem");
            for (Integer index:indexes){
                formItem.getJSONObject(index).getJSONObject("props").put("max", Math.min(rowNum-1, formItem.getJSONObject(index).getJSONObject("props").getInteger("max")));
            }
        }
        //todo
//        if (rowNum <= 10){
//            childData.put("warning", ApiResultCode.SMOOTH_DATA_PRE_REQ.getMessage());
//        }
        return childData;
    }

    public TaskDTO dataJsonUpdate(TaskDTO parentNode, TaskDTO childNode) {
        String filterType = "number";
        int index = 0;
        JSONObject childData = JSONObject.parseObject(childNode.getDataJson());
        JSONObject parentData = JSONObject.parseObject(parentNode.getDataJson());
        int algType = childData.getInteger("algType");
        if (algType == AlgPyEnum.FEATURE_SMOOTHING.getVal()) {
            index = 1;
            JSONObject childDataToUpdate = rowNumLimit(childData, parentNode);
            childNode.setDataJson(childDataToUpdate.toString());
        } else if (algType == AlgPyEnum.FEATURE_SCALING.getVal()) {
            index = 1;
        } else if (algType == AlgPyEnum.TIMESERIES_DECOMPOSE.getVal()) {
            index = 0;
        } else if (algType == AlgPyEnum.IMPUTATION_STAT.getVal()) {
            index = 0;
        } else if (algType == AlgPyEnum.IMPUTATION_MULTI.getVal()) {
            index = 0;
        } else if (algType == AlgPyEnum.ANOMALY_STAT.getVal()) {
            index = 1;
        } else if (algType == AlgPyEnum.ANOMALY_KNN.getVal()) {
            index = 0;
        } else if (algType == AlgPyEnum.TIMESERIES_SHIFT.getVal()) {
            index = 0;
            JSONObject childDataToUpdate = generalRowNumLimit(childData, parentNode, asList(1,2));
            childNode.setDataJson(childDataToUpdate.toString());
        }
//        else if (algType == AlgPyEnum.SIMULATE.getVal()) {
//            JSONObject formData = childData.getJSONArray("setParams").getJSONObject(0).getJSONObject("formData");
//            String name = childData.getJSONArray("setParams").getJSONObject(0)
//                .getJSONArray("formItem").getJSONObject(0).getString("chineseName");
//            String method = "";
//            if (name.startsWith("当前个税")){
//                method = "tih";
//            }
//            if (name.startsWith("当前关税")){
//                method = "tm";
//            };
//            if (name.startsWith("当前补贴")){
//                method = "transfrslgov";
//            }
//            formData.put("index", method);
//
//            if (formData.getString("input") == null){
//                formData.put("input", Float.valueOf(formData.getJSONArray("setParams").getJSONObject(0).getString("value")));
//            }
//        }
        childNode.setDataJson(featureFilter(childData, parentData, filterType, index).toString());
        childData.put("algType", childNode.view().getData().getString("algType"));
        childData.put("isSimple", false);
        childData.put("connected", true);
        childNode.setDataJson(childData.toString());
        this.update(childNode);
        return childNode;
    }

    /**
     * 获取节点中可视化的数据
     *
     * @param vo
     * @return
     */
    public JSONObject queryDataVis(TaskVO vo) {
        TaskDTO task = this.queryById(vo.getId());

        String table = this.getOutputTableName(task);
        JSONObject res = new JSONObject();
        JSONObject dataJson = task.view().getData();
        int algType = dataJson.getInteger("algType");
        if (algType == AlgPyEnum.TIMESERIES_SHIFT.getVal()){
            JSONObject charData = new JSONObject();
            if (dataJson.containsKey("cols") && dataJson.containsKey("corr")){
                charData.put("cols", dataJson.getJSONArray("cols"));
                charData.put("corr", dataJson.getJSONArray("corr"));
            }
            return charData;
        }
        String sql = null;
        try {
            ColumnQueryVO columnQuery = new ColumnQueryVO();
            columnQuery.setTaskId(vo.getId());
            columnQuery.setTable(table);
            columnQuery.setMode(2);
            JSONObject parentTableInfo = tColumnService.queryDetail(columnQuery);
            int rowNum = parentTableInfo.getJSONObject("page").getInteger("totalElements");
            int interval = 1;
            if (rowNum > 500) {
                interval = rowNum / 500;
            }

            if (algType == AlgPyEnum.ANOMALY_KNN.getVal()
                    || algType == AlgPyEnum.ANOMALY_STAT.getVal()) {
                if (dataJson.containsKey("data_vis")) {
                    return dataJson.getJSONObject("data_vis");
                } else {
                    return res;
                }
            }

            String tableName = this.getOutputTableName(task);
            String columns = "_record_id_";
            String sqlOrder = " order by _record_id_";
            String sampling = "where _record_id_ % " + interval + " = 0";
            JSONObject formData = dataJson.getJSONArray("setParams").getJSONObject(0).getJSONObject("formData");

            if (algType == AlgPyEnum.FEATURE_SMOOTHING.getVal()) {
                String col = formData.getString("col");
                String method = formData.getString("method");
                columns = columns + "," + col + "," + col + "_" + method;
            }

            if (algType == AlgPyEnum.TIMESERIES_DECOMPOSE.getVal()) {
                String col = formData.getString("col");
                String seasonal = col + "_seasonal,";
                String trend = col + "_trend,";
                String residual = col + "_resid";

                columns = columns + "," + col + "," + seasonal + trend + residual;
            }
            List<String> whereSql = new ArrayList<>();
            whereSql.add(sampling);
            for (String col : columns.split(",")) {
                whereSql.add(col + " is not null");
            }
            sql = String.format("select %s from %s %s %s", columns, tableName, Joiner.on(" and ").join(whereSql), sqlOrder);
        } catch (Exception e) {
            logger.error(e.toString());
            return null;
        }

        Connection con = null;
        ResultSet rs = null;
        PreparedStatement ps = null;
        try {
            con = gpDataProvider.getConn(DatabaseConstant.GREEN_PLUM_DATASET_ID);
            ps = con.prepareStatement(sql);
            rs = ps.executeQuery();

            ResultSetMetaData meta = rs.getMetaData();
            int colCount = meta.getColumnCount();
            /* 生成head结构 */
            JSONArray heads = new JSONArray();
            List<String> colNames = new ArrayList<>();
            for (int i = 1; i < colCount + 1; i++) {
                if (DatasetConstant.DEFAULT_ID_FIELD.equals(meta.getColumnName(i))) {
                    continue;
                }
                JSONObject head = new JSONObject();
                String name = meta.getColumnName(i);
                head.put("name", name);
                heads.add(head);
                colNames.add(name);
            }

            if (algType == AlgPyEnum.FEATURE_SMOOTHING.getVal()) {
                JSONArray row = new JSONArray();
                JSONObject column = new JSONObject();

                float index = 0;
                while (rs.next()) {
                    index += 1;
                    ArrayList<Float> yOnxAxis = new ArrayList<>();
                    yOnxAxis.add(index);
                    column = new JSONObject();
                    for (String colName : colNames) {
                        if (DatasetConstant.DEFAULT_ID_FIELD.equals(colName)) {
                            continue;
                        }
                        yOnxAxis.add(rs.getFloat(colName));
                    }
                    row.add(yOnxAxis);
                }
                res.put("data", row);


            } else if (algType == AlgPyEnum.TIMESERIES_DECOMPOSE.getVal()) {
                float index = 0;
                JSONObject rows = new JSONObject();
                for (String colName : colNames) {
                    rows.put(colName, new JSONArray());
                }
                while (rs.next()) {
                    index += 1;
                    for (String colName : colNames) {
                        JSONArray row = rows.getJSONArray(colName);
                        ArrayList<Float> yOnxAxis = new ArrayList<>();
                        yOnxAxis.add(index);
                        yOnxAxis.add(rs.getFloat(colName));
                        row.add(yOnxAxis);
                        res.put(colName, row);
                    }
                }

            }
            res.put("head", heads);
        } catch (Exception e) {
            logger.error("load data vis error: ", e);
            return null;
        }

        return res;
    }


    /**
     * 更新JOIN 和 UNION 节点
     *
     * @param taskDTO
     */
    @Transactional
    public TaskDTO updateJoinUnionConfig(TaskDTO taskDTO) {
        String[] split = taskDTO.getParentId().split(",");
        if (split.length == 2) {
            TaskDTO parentA = this.queryById(Long.parseLong(split[0]));
            TaskDTO parentB = this.queryById(Long.parseLong(split[1]));

            String tableA = TaskUtil.extractTableStr(parentA);
            if (!gpDataProvider.checkTableIfExist(new Table(1L, tableA))) {
                //获取最新taskInstance
                tableA = taskInstanceService.getInstanceTableName(Long.parseLong(split[0]));
                this.updateTaskOutputTable(parentA, tableA);
            }
            logger.warn("join | union table A -> {}", tableA);
            String tableB = TaskUtil.extractTableStr(parentB);
            if (!gpDataProvider.checkTableIfExist(new Table(1L, tableB))) {
                //获取最新taskInstance
                tableB = taskInstanceService.getInstanceTableName(Long.parseLong(split[1]));
                this.updateTaskOutputTable(parentB, tableB);
            }
            logger.warn("join | union table B -> {}", tableB);

            JSONObject parentAOutput = JSONObject.parseObject(parentA.getDataJson()).getJSONArray("output").getJSONObject(0);
            JSONObject parentBOutput = JSONObject.parseObject(parentB.getDataJson()).getJSONArray("output").getJSONObject(0);

            JSONObject oldJsonObject = JSONObject.parseObject(taskDTO.getDataJson());
            JSONArray oldInputArray = oldJsonObject.getJSONArray("input");

            oldInputArray.set(0, parentAOutput);
            oldInputArray.set(1, parentBOutput);

            oldInputArray.getJSONObject(0).put("tableName", tableA);
            oldInputArray.getJSONObject(1).put("tableName", tableB);

            oldInputArray.getJSONObject(0).put("nodeName", parentA.getName());
            oldInputArray.getJSONObject(1).put("nodeName", parentB.getName());

            oldJsonObject.put("input", oldInputArray);
            taskDTO.setDataJson(oldJsonObject.toJSONString());
            this.update(taskDTO);
            return taskDTO;
        } else {
            logger.error("[UNION|JOIN] task node {} configuration is wrong", taskDTO.getId());
        }
        return null;
    }

    /**
     * 针对顺序执行，表结果更新不及时的 补救方法
     *
     * @param taskDTO
     * @param tableStr
     */
    private void updateTaskOutputTable(TaskDTO taskDTO, String tableStr) {
        JSONObject jsonObject = JSONObject.parseObject(taskDTO.getDataJson());
        JSONObject output = jsonObject.getJSONArray("output").getJSONObject(0);
        output.put("tableName", tableStr);
        JSONArray newOutput = new JSONArray();
        newOutput.add(output);
        jsonObject.put("output", newOutput);
        taskDTO.setDataJson(jsonObject.toJSONString());
        this.update(taskDTO);
    }

    public Map<Long, TaskDTO> generateRelationshipMap(List<Long> taskIds) {
        return generateRelationshipMap(taskIds, false);
    }

    public Map<Long, TaskDTO> generateRelationshipMap(List<Long> taskIds, Boolean needReverse) {
        Map<Long, TaskDTO> resultMap = new HashMap<>();
        for (Long id : taskIds) {
            TaskDTO tempDTO = this.queryById(id);
            resultMap.put(id, tempDTO);
        }
        return resultMap;
    }

    /**
     *
     * @param taskId
     * @param keyPath
     * @return
     */
    public JSONObject queryJsonObjectByIdAndKey(Long taskId, String keyPath){
        return taskMapper.queryJsonObjectByIdAndKey(taskId, keyPath).getJSONObject(QUERY_RESULT_IDX);
    }

    /**
     *
     * @param taskId
     * @param keyPath
     * @return
     */
    public <T> T queryJsonObjectByIdAndKey(Long taskId, String keyPath, Class<T> clazz){
        return taskMapper.queryJsonObjectByIdAndKey(taskId, keyPath).getObject(QUERY_RESULT_IDX, clazz);
    }

    /**
     *
     * @param taskId
     * @return
     * e.g. 查询某个task DataJson 中 结果字段TableCols内容
     * JSONArray result = taskService.queryJsonArrayByIdAndKey(taskId, "output[0].tableCols");
     */
    public JSONArray queryJsonArrayByIdAndKey(Long taskId, String keyPath){
        return taskMapper.queryJsonArrayByIdAndKey(taskId, keyPath).getJSONArray(0);
    }

    /**
     *
     * @param taskId
     * @return
     * e.g. 查询某个task DataJson 中 结果字段TableCols内容 并转成List<String> 类型
     * List<String> result = taskService.queryJsonArrayByIdAndKey(taskId, "output[0].tableCols", String.class);
     */
    public <T> List<T>  queryJsonArrayByIdAndKey(Long taskId, String keyPath, Class<T> clazz){
        return taskMapper.queryJsonArrayByIdAndKey(taskId, keyPath).getJSONArray(0).toJavaList(clazz);
    }

    public List<Long> returnAllChildrenIds(Long taskId) {
        TaskDTO taskDTO = taskMapper.queryById(taskId);
        Set<Long> result = Sets.newHashSet();
        List<Long> childIdList = taskDTO.getChildIdList();
        result.addAll(childIdList);
        while (childIdList.size() > 0){
            Long peek = childIdList.remove(0);
            List<Long> tempChildrenList = taskMapper.queryById(peek).getChildIdList();
            if (tempChildrenList.size() > 0){
                result.addAll(tempChildrenList);
                childIdList.addAll(tempChildrenList);
            }
        }
        return result.stream().collect(Collectors.toList());
    }
}